| /* |
| * Read a file and write the contents to stdout. If a given read takes |
| * longer than 'max_us' time, then we schedule a new thread to handle |
| * the next read. This avoids the coordinated omission problem, where |
| * one request appears to take a long time, but in reality a lot of |
| * requests would have been slow, but we don't notice since new submissions |
| * are not being issued if just 1 is held up. |
| * |
| * One test case: |
| * |
| * $ time (./read-to-pipe-async -f randfile.gz | gzip -dc > outfile; sync) |
| * |
| * This will read randfile.gz and log the latencies of doing so, while |
| * piping the output to gzip to decompress it. Any latencies over max_us |
| * are logged when they happen, and latency buckets are displayed at the |
| * end of the run |
| * |
| * gcc -Wall -g -O2 -o read-to-pipe-async read-to-pipe-async.c -lpthread |
| * |
| * Copyright (C) 2016 Jens Axboe |
| * |
| */ |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <unistd.h> |
| #include <fcntl.h> |
| #include <sys/time.h> |
| #include <sys/types.h> |
| #include <sys/stat.h> |
| #include <inttypes.h> |
| #include <string.h> |
| #include <pthread.h> |
| #include <errno.h> |
| #include <assert.h> |
| |
| #include "../flist.h" |
| |
| static int bs = 4096; |
| static int max_us = 10000; |
| static char *file; |
| static int separate_writer = 1; |
| |
| #define PLAT_BITS 8 |
| #define PLAT_VAL (1 << PLAT_BITS) |
| #define PLAT_GROUP_NR 19 |
| #define PLAT_NR (PLAT_GROUP_NR * PLAT_VAL) |
| #define PLAT_LIST_MAX 20 |
| |
| struct stats { |
| unsigned int plat[PLAT_NR]; |
| unsigned int nr_samples; |
| unsigned int max; |
| unsigned int min; |
| unsigned int over; |
| }; |
| |
| static double plist[PLAT_LIST_MAX] = { 50.0, 75.0, 90.0, 95.0, 99.0, 99.5, 99.9, 99.99, 99.999, 99.9999, }; |
| |
| struct thread_data { |
| int exit; |
| int done; |
| pthread_mutex_t lock; |
| pthread_cond_t cond; |
| pthread_mutex_t done_lock; |
| pthread_cond_t done_cond; |
| pthread_t thread; |
| }; |
| |
| struct writer_thread { |
| struct flist_head list; |
| struct flist_head done_list; |
| struct stats s; |
| struct thread_data thread; |
| }; |
| |
| struct reader_thread { |
| struct flist_head list; |
| struct flist_head done_list; |
| int started; |
| int busy; |
| int write_seq; |
| struct stats s; |
| struct thread_data thread; |
| }; |
| |
| struct work_item { |
| struct flist_head list; |
| void *buf; |
| size_t buf_size; |
| off_t off; |
| int fd; |
| int seq; |
| struct writer_thread *writer; |
| struct reader_thread *reader; |
| pthread_mutex_t lock; |
| pthread_cond_t cond; |
| pthread_t thread; |
| }; |
| |
| static struct reader_thread reader_thread; |
| static struct writer_thread writer_thread; |
| |
| uint64_t utime_since(const struct timeval *s, const struct timeval *e) |
| { |
| long sec, usec; |
| uint64_t ret; |
| |
| sec = e->tv_sec - s->tv_sec; |
| usec = e->tv_usec - s->tv_usec; |
| if (sec > 0 && usec < 0) { |
| sec--; |
| usec += 1000000; |
| } |
| |
| if (sec < 0 || (sec == 0 && usec < 0)) |
| return 0; |
| |
| ret = sec * 1000000ULL + usec; |
| |
| return ret; |
| } |
| |
| static struct work_item *find_seq(struct writer_thread *w, unsigned int seq) |
| { |
| struct work_item *work; |
| struct flist_head *entry; |
| |
| if (flist_empty(&w->list)) |
| return NULL; |
| |
| flist_for_each(entry, &w->list) { |
| work = flist_entry(entry, struct work_item, list); |
| if (work->seq == seq) |
| return work; |
| } |
| |
| return NULL; |
| } |
| |
| static unsigned int plat_val_to_idx(unsigned int val) |
| { |
| unsigned int msb, error_bits, base, offset; |
| |
| /* Find MSB starting from bit 0 */ |
| if (val == 0) |
| msb = 0; |
| else |
| msb = sizeof(val)*8 - __builtin_clz(val) - 1; |
| |
| /* |
| * MSB <= (PLAT_BITS-1), cannot be rounded off. Use |
| * all bits of the sample as index |
| */ |
| if (msb <= PLAT_BITS) |
| return val; |
| |
| /* Compute the number of error bits to discard*/ |
| error_bits = msb - PLAT_BITS; |
| |
| /* Compute the number of buckets before the group */ |
| base = (error_bits + 1) << PLAT_BITS; |
| |
| /* |
| * Discard the error bits and apply the mask to find the |
| * index for the buckets in the group |
| */ |
| offset = (PLAT_VAL - 1) & (val >> error_bits); |
| |
| /* Make sure the index does not exceed (array size - 1) */ |
| return (base + offset) < (PLAT_NR - 1) ? |
| (base + offset) : (PLAT_NR - 1); |
| } |
| |
| /* |
| * Convert the given index of the bucket array to the value |
| * represented by the bucket |
| */ |
| static unsigned int plat_idx_to_val(unsigned int idx) |
| { |
| unsigned int error_bits, k, base; |
| |
| assert(idx < PLAT_NR); |
| |
| /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use |
| * all bits of the sample as index */ |
| if (idx < (PLAT_VAL << 1)) |
| return idx; |
| |
| /* Find the group and compute the minimum value of that group */ |
| error_bits = (idx >> PLAT_BITS) - 1; |
| base = 1 << (error_bits + PLAT_BITS); |
| |
| /* Find its bucket number of the group */ |
| k = idx % PLAT_VAL; |
| |
| /* Return the mean of the range of the bucket */ |
| return base + ((k + 0.5) * (1 << error_bits)); |
| } |
| |
| static void add_lat(struct stats *s, unsigned int us, const char *name) |
| { |
| int lat_index = 0; |
| |
| if (us > s->max) |
| s->max = us; |
| if (us < s->min) |
| s->min = us; |
| |
| if (us > max_us) { |
| fprintf(stderr, "%s latency=%u usec\n", name, us); |
| s->over++; |
| } |
| |
| lat_index = plat_val_to_idx(us); |
| __sync_fetch_and_add(&s->plat[lat_index], 1); |
| __sync_fetch_and_add(&s->nr_samples, 1); |
| } |
| |
| static int write_work(struct work_item *work) |
| { |
| struct timeval s, e; |
| ssize_t ret; |
| |
| gettimeofday(&s, NULL); |
| ret = write(STDOUT_FILENO, work->buf, work->buf_size); |
| gettimeofday(&e, NULL); |
| assert(ret == work->buf_size); |
| |
| add_lat(&work->writer->s, utime_since(&s, &e), "write"); |
| return work->seq + 1; |
| } |
| |
| static void thread_exiting(struct thread_data *thread) |
| { |
| __sync_fetch_and_add(&thread->done, 1); |
| pthread_cond_signal(&thread->done_cond); |
| } |
| |
| static void *writer_fn(void *data) |
| { |
| struct writer_thread *wt = data; |
| struct work_item *work; |
| unsigned int seq = 1; |
| |
| work = NULL; |
| while (!wt->thread.exit || !flist_empty(&wt->list)) { |
| pthread_mutex_lock(&wt->thread.lock); |
| |
| if (work) { |
| flist_add_tail(&work->list, &wt->done_list); |
| work = NULL; |
| } |
| |
| work = find_seq(wt, seq); |
| if (work) |
| flist_del_init(&work->list); |
| else |
| pthread_cond_wait(&wt->thread.cond, &wt->thread.lock); |
| |
| pthread_mutex_unlock(&wt->thread.lock); |
| |
| if (work) |
| seq = write_work(work); |
| } |
| |
| thread_exiting(&wt->thread); |
| return NULL; |
| } |
| |
| static void reader_work(struct work_item *work) |
| { |
| struct timeval s, e; |
| ssize_t ret; |
| size_t left; |
| void *buf; |
| off_t off; |
| |
| gettimeofday(&s, NULL); |
| |
| left = work->buf_size; |
| buf = work->buf; |
| off = work->off; |
| while (left) { |
| ret = pread(work->fd, buf, left, off); |
| if (!ret) { |
| fprintf(stderr, "zero read\n"); |
| break; |
| } else if (ret < 0) { |
| fprintf(stderr, "errno=%d\n", errno); |
| break; |
| } |
| left -= ret; |
| off += ret; |
| buf += ret; |
| } |
| |
| gettimeofday(&e, NULL); |
| |
| add_lat(&work->reader->s, utime_since(&s, &e), "read"); |
| |
| pthread_cond_signal(&work->cond); |
| |
| if (separate_writer) { |
| pthread_mutex_lock(&work->writer->thread.lock); |
| flist_add_tail(&work->list, &work->writer->list); |
| pthread_mutex_unlock(&work->writer->thread.lock); |
| pthread_cond_signal(&work->writer->thread.cond); |
| } else { |
| struct reader_thread *rt = work->reader; |
| struct work_item *next = NULL; |
| struct flist_head *entry; |
| |
| /* |
| * Write current work if it matches in sequence. |
| */ |
| if (work->seq == rt->write_seq) |
| goto write_it; |
| |
| pthread_mutex_lock(&rt->thread.lock); |
| |
| flist_add_tail(&work->list, &rt->done_list); |
| |
| /* |
| * See if the next work item is here, if so, write it |
| */ |
| work = NULL; |
| flist_for_each(entry, &rt->done_list) { |
| next = flist_entry(entry, struct work_item, list); |
| if (next->seq == rt->write_seq) { |
| work = next; |
| flist_del(&work->list); |
| break; |
| } |
| } |
| |
| pthread_mutex_unlock(&rt->thread.lock); |
| |
| if (work) { |
| write_it: |
| write_work(work); |
| __sync_fetch_and_add(&rt->write_seq, 1); |
| } |
| } |
| } |
| |
| static void *reader_one_off(void *data) |
| { |
| reader_work(data); |
| return NULL; |
| } |
| |
| static void *reader_fn(void *data) |
| { |
| struct reader_thread *rt = data; |
| struct work_item *work; |
| |
| while (!rt->thread.exit || !flist_empty(&rt->list)) { |
| work = NULL; |
| pthread_mutex_lock(&rt->thread.lock); |
| if (!flist_empty(&rt->list)) { |
| work = flist_first_entry(&rt->list, struct work_item, list); |
| flist_del_init(&work->list); |
| } else |
| pthread_cond_wait(&rt->thread.cond, &rt->thread.lock); |
| pthread_mutex_unlock(&rt->thread.lock); |
| |
| if (work) { |
| __sync_fetch_and_add(&rt->busy, 1); |
| reader_work(work); |
| __sync_fetch_and_sub(&rt->busy, 1); |
| } |
| } |
| |
| thread_exiting(&rt->thread); |
| return NULL; |
| } |
| |
| static void queue_work(struct reader_thread *rt, struct work_item *work) |
| { |
| if (!rt->started) { |
| pthread_mutex_lock(&rt->thread.lock); |
| flist_add_tail(&work->list, &rt->list); |
| pthread_mutex_unlock(&rt->thread.lock); |
| |
| rt->started = 1; |
| pthread_create(&rt->thread.thread, NULL, reader_fn, rt); |
| } else if (!rt->busy && !pthread_mutex_trylock(&rt->thread.lock)) { |
| flist_add_tail(&work->list, &rt->list); |
| pthread_mutex_unlock(&rt->thread.lock); |
| |
| pthread_cond_signal(&rt->thread.cond); |
| } else { |
| int ret = pthread_create(&work->thread, NULL, reader_one_off, work); |
| if (ret) |
| fprintf(stderr, "pthread_create=%d\n", ret); |
| else |
| pthread_detach(work->thread); |
| } |
| } |
| |
| static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr, |
| unsigned int **output) |
| { |
| unsigned long sum = 0; |
| unsigned int len, i, j = 0; |
| unsigned int oval_len = 0; |
| unsigned int *ovals = NULL; |
| int is_last; |
| |
| len = 0; |
| while (len < PLAT_LIST_MAX && plist[len] != 0.0) |
| len++; |
| |
| if (!len) |
| return 0; |
| |
| /* |
| * Calculate bucket values, note down max and min values |
| */ |
| is_last = 0; |
| for (i = 0; i < PLAT_NR && !is_last; i++) { |
| sum += io_u_plat[i]; |
| while (sum >= (plist[j] / 100.0 * nr)) { |
| assert(plist[j] <= 100.0); |
| |
| if (j == oval_len) { |
| oval_len += 100; |
| ovals = realloc(ovals, oval_len * sizeof(unsigned int)); |
| } |
| |
| ovals[j] = plat_idx_to_val(i); |
| is_last = (j == len - 1); |
| if (is_last) |
| break; |
| |
| j++; |
| } |
| } |
| |
| *output = ovals; |
| return len; |
| } |
| |
| static void show_latencies(struct stats *s, const char *msg) |
| { |
| unsigned int *ovals = NULL; |
| unsigned int len, i; |
| |
| len = calc_percentiles(s->plat, s->nr_samples, &ovals); |
| if (len) { |
| fprintf(stderr, "Latency percentiles (usec) (%s)\n", msg); |
| for (i = 0; i < len; i++) |
| fprintf(stderr, "\t%2.4fth: %u\n", plist[i], ovals[i]); |
| } |
| |
| if (ovals) |
| free(ovals); |
| |
| fprintf(stderr, "\tOver=%u, min=%u, max=%u\n", s->over, s->min, s->max); |
| } |
| |
| static void init_thread(struct thread_data *thread) |
| { |
| pthread_cond_init(&thread->cond, NULL); |
| pthread_cond_init(&thread->done_cond, NULL); |
| pthread_mutex_init(&thread->lock, NULL); |
| pthread_mutex_init(&thread->done_lock, NULL); |
| thread->exit = 0; |
| } |
| |
| static void exit_thread(struct thread_data *thread, |
| void fn(struct writer_thread *), |
| struct writer_thread *wt) |
| { |
| __sync_fetch_and_add(&thread->exit, 1); |
| pthread_cond_signal(&thread->cond); |
| |
| while (!thread->done) { |
| pthread_mutex_lock(&thread->done_lock); |
| |
| if (fn) { |
| struct timeval tv; |
| struct timespec ts; |
| |
| gettimeofday(&tv, NULL); |
| ts.tv_sec = tv.tv_sec + 1; |
| ts.tv_nsec = tv.tv_usec * 1000ULL; |
| |
| pthread_cond_timedwait(&thread->done_cond, &thread->done_lock, &ts); |
| fn(wt); |
| } else |
| pthread_cond_wait(&thread->done_cond, &thread->done_lock); |
| |
| pthread_mutex_unlock(&thread->done_lock); |
| } |
| } |
| |
| static int usage(char *argv[]) |
| { |
| fprintf(stderr, "%s: [-b blocksize] [-t max usec] [-w separate writer] -f file\n", argv[0]); |
| return 1; |
| } |
| |
| static int parse_options(int argc, char *argv[]) |
| { |
| int c; |
| |
| while ((c = getopt(argc, argv, "f:b:t:w:")) != -1) { |
| switch (c) { |
| case 'f': |
| file = strdup(optarg); |
| break; |
| case 'b': |
| bs = atoi(optarg); |
| break; |
| case 't': |
| max_us = atoi(optarg); |
| break; |
| case 'w': |
| separate_writer = atoi(optarg); |
| if (!separate_writer) |
| fprintf(stderr, "inline writing is broken\n"); |
| break; |
| case '?': |
| default: |
| return usage(argv); |
| } |
| } |
| |
| if (!file) |
| return usage(argv); |
| |
| return 0; |
| } |
| |
| static void prune_done_entries(struct writer_thread *wt) |
| { |
| FLIST_HEAD(list); |
| |
| if (flist_empty(&wt->done_list)) |
| return; |
| |
| if (pthread_mutex_trylock(&wt->thread.lock)) |
| return; |
| |
| if (!flist_empty(&wt->done_list)) |
| flist_splice_init(&wt->done_list, &list); |
| pthread_mutex_unlock(&wt->thread.lock); |
| |
| while (!flist_empty(&list)) { |
| struct work_item *work; |
| |
| work = flist_first_entry(&list, struct work_item, list); |
| flist_del(&work->list); |
| |
| pthread_cond_destroy(&work->cond); |
| pthread_mutex_destroy(&work->lock); |
| free(work->buf); |
| free(work); |
| } |
| } |
| |
| int main(int argc, char *argv[]) |
| { |
| struct timeval s, re, we; |
| struct reader_thread *rt; |
| struct writer_thread *wt; |
| unsigned long rate; |
| struct stat sb; |
| size_t bytes; |
| off_t off; |
| int fd, seq; |
| |
| if (parse_options(argc, argv)) |
| return 1; |
| |
| fd = open(file, O_RDONLY); |
| if (fd < 0) { |
| perror("open"); |
| return 2; |
| } |
| |
| if (fstat(fd, &sb) < 0) { |
| perror("stat"); |
| return 3; |
| } |
| |
| wt = &writer_thread; |
| init_thread(&wt->thread); |
| INIT_FLIST_HEAD(&wt->list); |
| INIT_FLIST_HEAD(&wt->done_list); |
| wt->s.max = 0; |
| wt->s.min = -1U; |
| pthread_create(&wt->thread.thread, NULL, writer_fn, wt); |
| |
| rt = &reader_thread; |
| init_thread(&rt->thread); |
| INIT_FLIST_HEAD(&rt->list); |
| INIT_FLIST_HEAD(&rt->done_list); |
| rt->s.max = 0; |
| rt->s.min = -1U; |
| rt->write_seq = 1; |
| |
| off = 0; |
| seq = 0; |
| bytes = 0; |
| |
| gettimeofday(&s, NULL); |
| |
| while (sb.st_size) { |
| struct work_item *work; |
| size_t this_len; |
| struct timespec ts; |
| struct timeval tv; |
| |
| prune_done_entries(wt); |
| |
| this_len = sb.st_size; |
| if (this_len > bs) |
| this_len = bs; |
| |
| work = calloc(1, sizeof(*work)); |
| work->buf = malloc(this_len); |
| work->buf_size = this_len; |
| work->off = off; |
| work->fd = fd; |
| work->seq = ++seq; |
| work->writer = wt; |
| work->reader = rt; |
| pthread_cond_init(&work->cond, NULL); |
| pthread_mutex_init(&work->lock, NULL); |
| |
| queue_work(rt, work); |
| |
| gettimeofday(&tv, NULL); |
| ts.tv_sec = tv.tv_sec; |
| ts.tv_nsec = tv.tv_usec * 1000ULL; |
| ts.tv_nsec += max_us * 1000ULL; |
| if (ts.tv_nsec >= 1000000000ULL) { |
| ts.tv_nsec -= 1000000000ULL; |
| ts.tv_sec++; |
| } |
| |
| pthread_mutex_lock(&work->lock); |
| pthread_cond_timedwait(&work->cond, &work->lock, &ts); |
| pthread_mutex_unlock(&work->lock); |
| |
| off += this_len; |
| sb.st_size -= this_len; |
| bytes += this_len; |
| } |
| |
| exit_thread(&rt->thread, NULL, NULL); |
| gettimeofday(&re, NULL); |
| |
| exit_thread(&wt->thread, prune_done_entries, wt); |
| gettimeofday(&we, NULL); |
| |
| show_latencies(&rt->s, "READERS"); |
| show_latencies(&wt->s, "WRITERS"); |
| |
| bytes /= 1024; |
| rate = (bytes * 1000UL * 1000UL) / utime_since(&s, &re); |
| fprintf(stderr, "Read rate (KiB/sec) : %lu\n", rate); |
| rate = (bytes * 1000UL * 1000UL) / utime_since(&s, &we); |
| fprintf(stderr, "Write rate (KiB/sec): %lu\n", rate); |
| |
| close(fd); |
| return 0; |
| } |