blob: 089bd3bb1e1c150cfd877838c1e56c6d45e4af9d [file] [log] [blame]
Alistair Veitch85afe712016-02-02 17:58:15 -08001/*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/census/log.h"
35#include <grpc/support/cpu.h>
36#include <grpc/support/log.h>
37#include <grpc/support/port_platform.h>
38#include <grpc/support/sync.h>
39#include <grpc/support/thd.h>
40#include <grpc/support/time.h>
41#include <grpc/support/useful.h>
42#include <stdio.h>
43#include <stdlib.h>
44#include <string.h>
45#include "test/core/util/test_config.h"
46
47/* Change this to non-zero if you want more output. */
48#define VERBOSE 0
49
50/* Log size to use for all tests. */
51#define LOG_SIZE_IN_MB 1
52#define LOG_SIZE_IN_BYTES (LOG_SIZE_IN_MB << 20)
53
54/* Fills in 'record' of size 'size'. Each byte in record is filled in with the
55 same value. The value is extracted from 'record' pointer. */
56static void write_record(char* record, size_t size) {
57 char data = (char)((uintptr_t)record % 255);
58 memset(record, data, size);
59}
60
61/* Reads fixed size records. Returns the number of records read in
62 'num_records'. */
63static void read_records(size_t record_size, const char* buffer,
64 size_t buffer_size, int* num_records) {
65 GPR_ASSERT(buffer_size >= record_size);
66 GPR_ASSERT(buffer_size % record_size == 0);
67 *num_records = (int)(buffer_size / record_size);
68 for (int i = 0; i < *num_records; ++i) {
69 const char* record = buffer + (record_size * (size_t)i);
70 char data = (char)((uintptr_t)record % 255);
71 for (size_t j = 0; j < record_size; ++j) {
72 GPR_ASSERT(data == record[j]);
73 }
74 }
75}
76
77/* Tries to write the specified number of records. Stops when the log gets
78 full. Returns the number of records written. Spins for random
79 number of times, up to 'max_spin_count', between writes. */
80static int write_records_to_log(int writer_id, size_t record_size,
81 int num_records, int max_spin_count) {
82 int counter = 0;
83 for (int i = 0; i < num_records; ++i) {
84 int spin_count = max_spin_count ? rand() % max_spin_count : 0;
85 if (VERBOSE && (counter++ == num_records / 10)) {
86 printf(" Writer %d: %d out of %d written\n", writer_id, i, num_records);
87 counter = 0;
88 }
89 char* record = (char*)(census_log_start_write(record_size));
90 if (record == NULL) {
91 return i;
92 }
93 write_record(record, record_size);
94 census_log_end_write(record, record_size);
95 for (int j = 0; j < spin_count; ++j) {
96 GPR_ASSERT(j >= 0);
97 }
98 }
99 return num_records;
100}
101
102/* Performs a single read iteration. Returns the number of records read. */
103static int perform_read_iteration(size_t record_size) {
104 const void* read_buffer = NULL;
105 size_t bytes_available;
106 int records_read = 0;
107 census_log_init_reader();
108 while ((read_buffer = census_log_read_next(&bytes_available))) {
109 int num_records = 0;
110 read_records(record_size, (const char*)read_buffer, bytes_available,
111 &num_records);
112 records_read += num_records;
113 }
114 return records_read;
115}
116
117/* Asserts that the log is empty. */
118static void assert_log_empty(void) {
119 census_log_init_reader();
120 size_t bytes_available;
121 GPR_ASSERT(census_log_read_next(&bytes_available) == NULL);
122}
123
124/* Fills the log and verifies data. If 'no fragmentation' is true, records
125 are sized such that CENSUS_LOG_2_MAX_RECORD_SIZE is a multiple of record
126 size. If not a circular log, verifies that the number of records written
127 match the number of records read. */
128static void fill_log(size_t log_size, int no_fragmentation, int circular_log) {
129 size_t size;
130 if (no_fragmentation) {
131 int log2size = rand() % (CENSUS_LOG_2_MAX_RECORD_SIZE + 1);
132 size = ((size_t)1 << log2size);
133 } else {
134 while (1) {
135 size = 1 + ((size_t)rand() % CENSUS_LOG_MAX_RECORD_SIZE);
136 if (CENSUS_LOG_MAX_RECORD_SIZE % size) {
137 break;
138 }
139 }
140 }
141 int records_written =
142 write_records_to_log(0 /* writer id */, size,
143 (int)((log_size / size) * 2), 0 /* spin count */);
144 int records_read = perform_read_iteration(size);
145 if (!circular_log) {
146 GPR_ASSERT(records_written == records_read);
147 }
148 assert_log_empty();
149}
150
151/* Structure to pass args to writer_thread */
152typedef struct writer_thread_args {
153 /* Index of this thread in the writers vector. */
154 int index;
155 /* Record size. */
156 size_t record_size;
157 /* Number of records to write. */
158 int num_records;
159 /* Used to signal when writer is complete */
160 gpr_cv* done;
161 gpr_mu* mu;
162 int* count;
163} writer_thread_args;
164
165/* Writes the given number of records of random size (up to kMaxRecordSize) and
166 random data to the specified log. */
167static void writer_thread(void* arg) {
168 writer_thread_args* args = (writer_thread_args*)arg;
169 /* Maximum number of times to spin between writes. */
170 static const int MAX_SPIN_COUNT = 50;
171 int records_written = 0;
172 if (VERBOSE) {
173 printf(" Writer %d starting\n", args->index);
174 }
175 while (records_written < args->num_records) {
176 records_written += write_records_to_log(args->index, args->record_size,
177 args->num_records - records_written,
178 MAX_SPIN_COUNT);
179 if (records_written < args->num_records) {
180 /* Ran out of log space. Sleep for a bit and let the reader catch up.
181 This should never happen for circular logs. */
182 if (VERBOSE) {
Alistair Veitch49653c52016-02-05 11:59:29 -0800183 printf(
184 " Writer %d stalled due to out-of-space: %d out of %d written\n",
185 args->index, records_written, args->num_records);
Alistair Veitch85afe712016-02-02 17:58:15 -0800186 }
187 gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
188 }
189 }
190 /* Done. Decrement count and signal. */
191 gpr_mu_lock(args->mu);
192 (*args->count)--;
Alistair Veitch49653c52016-02-05 11:59:29 -0800193 gpr_cv_signal(args->done);
Alistair Veitch85afe712016-02-02 17:58:15 -0800194 if (VERBOSE) {
195 printf(" Writer %d done\n", args->index);
196 }
197 gpr_mu_unlock(args->mu);
198}
199
200/* struct to pass args to reader_thread */
201typedef struct reader_thread_args {
202 /* Record size. */
203 size_t record_size;
204 /* Interval between read iterations. */
205 int read_iteration_interval_in_msec;
206 /* Total number of records. */
207 int total_records;
208 /* Signalled when reader should stop. */
209 gpr_cv stop;
210 int stop_flag;
211 /* Used to signal when reader has finished */
212 gpr_cv* done;
213 gpr_mu* mu;
214 int running;
215} reader_thread_args;
216
217/* Reads and verifies the specified number of records. Reader can also be
218 stopped via gpr_cv_signal(&args->stop). Sleeps for 'read_interval_in_msec'
219 between read iterations. */
220static void reader_thread(void* arg) {
221 reader_thread_args* args = (reader_thread_args*)arg;
222 if (VERBOSE) {
223 printf(" Reader starting\n");
224 }
225 gpr_timespec interval = gpr_time_from_micros(
226 args->read_iteration_interval_in_msec * 1000, GPR_TIMESPAN);
227 gpr_mu_lock(args->mu);
228 int records_read = 0;
229 int num_iterations = 0;
230 int counter = 0;
231 while (!args->stop_flag && records_read < args->total_records) {
232 gpr_cv_wait(&args->stop, args->mu, interval);
233 if (!args->stop_flag) {
234 records_read += perform_read_iteration(args->record_size);
235 GPR_ASSERT(records_read <= args->total_records);
236 if (VERBOSE && (counter++ == 100000)) {
237 printf(" Reader: %d out of %d read\n", records_read,
238 args->total_records);
239 counter = 0;
240 }
241 ++num_iterations;
242 }
243 }
244 /* Done */
245 args->running = 0;
Alistair Veitch49653c52016-02-05 11:59:29 -0800246 gpr_cv_signal(args->done);
Alistair Veitch85afe712016-02-02 17:58:15 -0800247 if (VERBOSE) {
248 printf(" Reader: records: %d, iterations: %d\n", records_read,
249 num_iterations);
250 }
251 gpr_mu_unlock(args->mu);
252}
253
254/* Creates NUM_WRITERS writers where each writer writes NUM_RECORDS_PER_WRITER
255 records. Also, starts a reader that iterates over and reads blocks every
256 READ_ITERATION_INTERVAL_IN_MSEC. */
257/* Number of writers. */
258#define NUM_WRITERS 5
259static void multiple_writers_single_reader(int circular_log) {
260 /* Sleep interval between read iterations. */
261 static const int READ_ITERATION_INTERVAL_IN_MSEC = 10;
262 /* Maximum record size. */
263 static const size_t MAX_RECORD_SIZE = 20;
264 /* Number of records written by each writer. This is sized such that we
265 will write through the entire log ~10 times. */
266 const int NUM_RECORDS_PER_WRITER =
267 (int)((10 * census_log_remaining_space()) / (MAX_RECORD_SIZE / 2)) /
268 NUM_WRITERS;
269 size_t record_size = ((size_t)rand() % MAX_RECORD_SIZE) + 1;
270 /* Create and start writers. */
271 writer_thread_args writers[NUM_WRITERS];
272 int writers_count = NUM_WRITERS;
273 gpr_cv writers_done;
274 gpr_mu writers_mu; /* protects writers_done and writers_count */
275 gpr_cv_init(&writers_done);
276 gpr_mu_init(&writers_mu);
277 gpr_thd_id id;
278 for (int i = 0; i < NUM_WRITERS; ++i) {
279 writers[i].index = i;
280 writers[i].record_size = record_size;
281 writers[i].num_records = NUM_RECORDS_PER_WRITER;
282 writers[i].done = &writers_done;
283 writers[i].count = &writers_count;
284 writers[i].mu = &writers_mu;
285 gpr_thd_new(&id, &writer_thread, &writers[i], NULL);
286 }
287 /* Start reader. */
288 gpr_cv reader_done;
289 gpr_mu reader_mu; /* protects reader_done and reader.running */
290 reader_thread_args reader;
291 reader.record_size = record_size;
292 reader.read_iteration_interval_in_msec = READ_ITERATION_INTERVAL_IN_MSEC;
293 reader.total_records = NUM_WRITERS * NUM_RECORDS_PER_WRITER;
294 reader.stop_flag = 0;
295 gpr_cv_init(&reader.stop);
296 gpr_cv_init(&reader_done);
297 reader.done = &reader_done;
298 gpr_mu_init(&reader_mu);
299 reader.mu = &reader_mu;
300 reader.running = 1;
301 gpr_thd_new(&id, &reader_thread, &reader, NULL);
302 /* Wait for writers to finish. */
303 gpr_mu_lock(&writers_mu);
304 while (writers_count != 0) {
305 gpr_cv_wait(&writers_done, &writers_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
306 }
307 gpr_mu_unlock(&writers_mu);
308 gpr_mu_destroy(&writers_mu);
309 gpr_cv_destroy(&writers_done);
310 gpr_mu_lock(&reader_mu);
311 if (circular_log) {
312 /* Stop reader. */
313 reader.stop_flag = 1;
314 gpr_cv_signal(&reader.stop);
315 }
316 /* wait for reader to finish */
317 while (reader.running) {
318 gpr_cv_wait(&reader_done, &reader_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
319 }
320 if (circular_log) {
321 /* Assert that there were no out-of-space errors. */
322 GPR_ASSERT(0 == census_log_out_of_space_count());
323 }
324 gpr_mu_unlock(&reader_mu);
325 gpr_mu_destroy(&reader_mu);
326 gpr_cv_destroy(&reader_done);
327 if (VERBOSE) {
328 printf(" Reader: finished\n");
329 }
330}
331
332static void setup_test(int circular_log) {
333 census_log_initialize(LOG_SIZE_IN_MB, circular_log);
334 GPR_ASSERT(census_log_remaining_space() == LOG_SIZE_IN_BYTES);
335}
336
337/* Attempts to create a record of invalid size (size >
338 CENSUS_LOG_MAX_RECORD_SIZE). */
339void test_invalid_record_size(void) {
340 static const size_t INVALID_SIZE = CENSUS_LOG_MAX_RECORD_SIZE + 1;
341 static const size_t VALID_SIZE = 1;
342 printf("Starting test: invalid record size\n");
343 setup_test(0);
344 void* record = census_log_start_write(INVALID_SIZE);
345 GPR_ASSERT(record == NULL);
346 /* Now try writing a valid record. */
347 record = census_log_start_write(VALID_SIZE);
348 GPR_ASSERT(record != NULL);
349 census_log_end_write(record, VALID_SIZE);
350 /* Verifies that available space went down by one block. In theory, this
351 check can fail if the thread is context switched to a new CPU during the
352 start_write execution (multiple blocks get allocated), but this has not
353 been observed in practice. */
354 GPR_ASSERT(LOG_SIZE_IN_BYTES - CENSUS_LOG_MAX_RECORD_SIZE ==
355 census_log_remaining_space());
356 census_log_shutdown();
357}
358
359/* Tests end_write() with a different size than what was specified in
360 start_write(). */
361void test_end_write_with_different_size(void) {
362 static const size_t START_WRITE_SIZE = 10;
363 static const size_t END_WRITE_SIZE = 7;
364 printf("Starting test: end write with different size\n");
365 setup_test(0);
366 void* record_written = census_log_start_write(START_WRITE_SIZE);
367 GPR_ASSERT(record_written != NULL);
368 census_log_end_write(record_written, END_WRITE_SIZE);
369 census_log_init_reader();
370 size_t bytes_available;
371 const void* record_read = census_log_read_next(&bytes_available);
372 GPR_ASSERT(record_written == record_read);
373 GPR_ASSERT(END_WRITE_SIZE == bytes_available);
374 assert_log_empty();
375 census_log_shutdown();
376}
377
378/* Verifies that pending records are not available via read_next(). */
379void test_read_pending_record(void) {
380 static const size_t PR_RECORD_SIZE = 1024;
381 printf("Starting test: read pending record\n");
382 setup_test(0);
383 /* Start a write. */
384 void* record_written = census_log_start_write(PR_RECORD_SIZE);
385 GPR_ASSERT(record_written != NULL);
386 /* As write is pending, read should fail. */
387 census_log_init_reader();
388 size_t bytes_available;
389 const void* record_read = census_log_read_next(&bytes_available);
390 GPR_ASSERT(record_read == NULL);
391 /* A read followed by end_write() should succeed. */
392 census_log_end_write(record_written, PR_RECORD_SIZE);
393 census_log_init_reader();
394 record_read = census_log_read_next(&bytes_available);
395 GPR_ASSERT(record_written == record_read);
396 GPR_ASSERT(PR_RECORD_SIZE == bytes_available);
397 assert_log_empty();
398 census_log_shutdown();
399}
400
401/* Tries reading beyond pending write. */
402void test_read_beyond_pending_record(void) {
403 printf("Starting test: read beyond pending record\n");
404 setup_test(0);
405 /* Start a write. */
406 const size_t incomplete_record_size = 10;
407 void* incomplete_record = census_log_start_write(incomplete_record_size);
408 GPR_ASSERT(incomplete_record != NULL);
409 const size_t complete_record_size = 20;
410 void* complete_record = census_log_start_write(complete_record_size);
411 GPR_ASSERT(complete_record != NULL);
412 GPR_ASSERT(complete_record != incomplete_record);
413 census_log_end_write(complete_record, complete_record_size);
414 /* Now iterate over blocks to read completed records. */
415 census_log_init_reader();
416 size_t bytes_available;
417 const void* record_read = census_log_read_next(&bytes_available);
418 GPR_ASSERT(complete_record == record_read);
419 GPR_ASSERT(complete_record_size == bytes_available);
420 /* Complete first record. */
421 census_log_end_write(incomplete_record, incomplete_record_size);
422 /* Have read past the incomplete record, so read_next() should return NULL. */
423 /* NB: this test also assumes our thread did not get switched to a different
424 CPU between the two start_write calls */
425 record_read = census_log_read_next(&bytes_available);
426 GPR_ASSERT(record_read == NULL);
427 /* Reset reader to get the newly completed record. */
428 census_log_init_reader();
429 record_read = census_log_read_next(&bytes_available);
430 GPR_ASSERT(incomplete_record == record_read);
431 GPR_ASSERT(incomplete_record_size == bytes_available);
432 assert_log_empty();
433 census_log_shutdown();
434}
435
436/* Tests scenario where block being read is detached from a core and put on the
437 dirty list. */
438void test_detached_while_reading(void) {
439 printf("Starting test: detached while reading\n");
440 setup_test(0);
441 /* Start a write. */
442 static const size_t DWR_RECORD_SIZE = 10;
443 void* record_written = census_log_start_write(DWR_RECORD_SIZE);
444 GPR_ASSERT(record_written != NULL);
445 census_log_end_write(record_written, DWR_RECORD_SIZE);
446 /* Read this record. */
447 census_log_init_reader();
448 size_t bytes_available;
449 const void* record_read = census_log_read_next(&bytes_available);
450 GPR_ASSERT(record_read != NULL);
451 GPR_ASSERT(DWR_RECORD_SIZE == bytes_available);
452 /* Now fill the log. This will move the block being read from core-local
453 array to the dirty list. */
454 while ((record_written = census_log_start_write(DWR_RECORD_SIZE))) {
455 census_log_end_write(record_written, DWR_RECORD_SIZE);
456 }
457
458 /* In this iteration, read_next() should only traverse blocks in the
459 core-local array. Therefore, we expect at most gpr_cpu_num_cores() more
460 blocks. As log is full, if read_next() is traversing the dirty list, we
461 will get more than gpr_cpu_num_cores() blocks. */
462 int block_read = 0;
463 while ((record_read = census_log_read_next(&bytes_available))) {
464 ++block_read;
465 GPR_ASSERT(block_read <= (int)gpr_cpu_num_cores());
466 }
467 census_log_shutdown();
468}
469
470/* Fills non-circular log with records sized such that size is a multiple of
471 CENSUS_LOG_MAX_RECORD_SIZE (no per-block fragmentation). */
472void test_fill_log_no_fragmentation(void) {
473 printf("Starting test: fill log no fragmentation\n");
474 const int circular = 0;
475 setup_test(circular);
476 fill_log(LOG_SIZE_IN_BYTES, 1 /* no fragmentation */, circular);
477 census_log_shutdown();
478}
479
480/* Fills circular log with records sized such that size is a multiple of
481 CENSUS_LOG_MAX_RECORD_SIZE (no per-block fragmentation). */
482void test_fill_circular_log_no_fragmentation(void) {
483 printf("Starting test: fill circular log no fragmentation\n");
484 const int circular = 1;
485 setup_test(circular);
486 fill_log(LOG_SIZE_IN_BYTES, 1 /* no fragmentation */, circular);
487 census_log_shutdown();
488}
489
490/* Fills non-circular log with records that may straddle end of a block. */
491void test_fill_log_with_straddling_records(void) {
492 printf("Starting test: fill log with straddling records\n");
493 const int circular = 0;
494 setup_test(circular);
495 fill_log(LOG_SIZE_IN_BYTES, 0 /* block straddling records */, circular);
496 census_log_shutdown();
497}
498
499/* Fills circular log with records that may straddle end of a block. */
500void test_fill_circular_log_with_straddling_records(void) {
501 printf("Starting test: fill circular log with straddling records\n");
502 const int circular = 1;
503 setup_test(circular);
504 fill_log(LOG_SIZE_IN_BYTES, 0 /* block straddling records */, circular);
505 census_log_shutdown();
506}
507
508/* Tests scenario where multiple writers and a single reader are using a log
509 that is configured to discard old records. */
510void test_multiple_writers_circular_log(void) {
511 printf("Starting test: multiple writers circular log\n");
512 const int circular = 1;
513 setup_test(circular);
514 multiple_writers_single_reader(circular);
515 census_log_shutdown();
516}
517
518/* Tests scenario where multiple writers and a single reader are using a log
519 that is configured to discard old records. */
520void test_multiple_writers(void) {
521 printf("Starting test: multiple writers\n");
522 const int circular = 0;
523 setup_test(circular);
524 multiple_writers_single_reader(circular);
525 census_log_shutdown();
526}
527
528/* Repeat the straddling records and multiple writers tests with a small log. */
529void test_small_log(void) {
530 printf("Starting test: small log\n");
531 const int circular = 0;
532 census_log_initialize(0, circular);
533 size_t log_size = census_log_remaining_space();
534 GPR_ASSERT(log_size > 0);
535 fill_log(log_size, 0, circular);
536 census_log_shutdown();
537 census_log_initialize(0, circular);
538 multiple_writers_single_reader(circular);
539 census_log_shutdown();
540}
541
542void test_performance(void) {
543 for (size_t write_size = 1; write_size < CENSUS_LOG_MAX_RECORD_SIZE;
544 write_size *= 2) {
545 setup_test(0);
546 gpr_timespec start_time = gpr_now(GPR_CLOCK_REALTIME);
547 int nrecords = 0;
548 while (1) {
549 void* record = census_log_start_write(write_size);
550 if (record == NULL) {
551 break;
552 }
553 census_log_end_write(record, write_size);
554 nrecords++;
555 }
556 gpr_timespec write_time =
557 gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), start_time);
558 double write_time_micro =
559 (double)write_time.tv_sec * 1000000 + (double)write_time.tv_nsec / 1000;
560 census_log_shutdown();
561 printf(
562 "Wrote %d %d byte records in %.3g microseconds: %g records/us "
563 "(%g ns/record), %g gigabytes/s\n",
564 nrecords, (int)write_size, write_time_micro,
565 nrecords / write_time_micro, 1000 * write_time_micro / nrecords,
566 (double)((int)write_size * nrecords) / write_time_micro / 1000);
567 }
568}
569
570int main(int argc, char** argv) {
571 grpc_test_init(argc, argv);
Alistair Veitch49653c52016-02-05 11:59:29 -0800572 gpr_time_init();
Alistair Veitch85afe712016-02-02 17:58:15 -0800573 srand((unsigned)gpr_now(GPR_CLOCK_REALTIME).tv_nsec);
574 test_invalid_record_size();
575 test_end_write_with_different_size();
576 test_read_pending_record();
577 test_read_beyond_pending_record();
578 test_detached_while_reading();
579 test_fill_log_no_fragmentation();
580 test_fill_circular_log_no_fragmentation();
581 test_fill_log_with_straddling_records();
582 test_fill_circular_log_with_straddling_records();
583 test_small_log();
584 test_multiple_writers();
585 test_multiple_writers_circular_log();
586 return 0;
587}