| /* Test program that performs producer-consumer style communication through |
| * a circular buffer. This test program is a slightly modified version of the |
| * test program made available by Miguel Ojeda |
| * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782. |
| */ |
| |
| |
| #include <stdio.h> |
| #include <string.h> |
| #include <stdlib.h> |
| #include <unistd.h> |
| #include <time.h> |
| #include <pthread.h> |
| #include <semaphore.h> |
| #include <fcntl.h> |
| #include "../../config.h" |
| |
| |
| /** gcc versions 4.1.0 and later have support for atomic builtins. */ |
| |
| #ifndef HAVE_BUILTIN_ATOMIC |
| #error Sorry, but this test program can only be compiled by a compiler that\ |
| has built-in functions for atomic memory access. |
| #endif |
| |
| |
| #define BUFFER_MAX (2) |
| #define DATA_SEMAPHORE_NAME "cb-data-semaphore" |
| #define FREE_SEMAPHORE_NAME "cb-free-semaphore" |
| |
| |
| typedef int data_t; |
| |
| typedef struct { |
| /* Counting semaphore representing the number of data items in the buffer. */ |
| sem_t* data; |
| /* Counting semaphore representing the number of free elements. */ |
| sem_t* free; |
| /* Position where a new elements should be written. */ |
| int in; |
| /* Position from where an element can be removed. */ |
| int out; |
| /* Mutex that protects 'in'. */ |
| pthread_mutex_t mutex_in; |
| /* Mutex that protects 'out'. */ |
| pthread_mutex_t mutex_out; |
| /* Data buffer. */ |
| data_t buffer[BUFFER_MAX]; |
| } buffer_t; |
| |
| static int quiet = 0; |
| static int use_locking = 1; |
| |
| static __inline__ |
| int fetch_and_add(int* p, int i) |
| { |
| return __sync_fetch_and_add(p, i); |
| } |
| |
| static sem_t* create_semaphore(const char* const name, const int value) |
| { |
| #ifdef VGO_darwin |
| char name_and_pid[32]; |
| snprintf(name_and_pid, sizeof(name_and_pid), "%s-%d", name, getpid()); |
| sem_t* p = sem_open(name_and_pid, O_CREAT | O_EXCL, 0600, value); |
| if (p == SEM_FAILED) { |
| perror("sem_open"); |
| return NULL; |
| } |
| return p; |
| #else |
| sem_t* p = malloc(sizeof(*p)); |
| if (p) |
| sem_init(p, 0, value); |
| return p; |
| #endif |
| } |
| |
| static void destroy_semaphore(const char* const name, sem_t* p) |
| { |
| #ifdef VGO_darwin |
| sem_close(p); |
| sem_unlink(name); |
| #else |
| sem_destroy(p); |
| free(p); |
| #endif |
| } |
| |
| static void buffer_init(buffer_t * b) |
| { |
| b->data = create_semaphore(DATA_SEMAPHORE_NAME, 0); |
| b->free = create_semaphore(FREE_SEMAPHORE_NAME, BUFFER_MAX); |
| |
| pthread_mutex_init(&b->mutex_in, NULL); |
| pthread_mutex_init(&b->mutex_out, NULL); |
| |
| b->in = 0; |
| b->out = 0; |
| } |
| |
| static void buffer_recv(buffer_t* b, data_t* d) |
| { |
| int out; |
| sem_wait(b->data); |
| if (use_locking) |
| pthread_mutex_lock(&b->mutex_out); |
| out = fetch_and_add(&b->out, 1); |
| if (out >= BUFFER_MAX) |
| { |
| fetch_and_add(&b->out, -BUFFER_MAX); |
| out -= BUFFER_MAX; |
| } |
| *d = b->buffer[out]; |
| if (use_locking) |
| pthread_mutex_unlock(&b->mutex_out); |
| if (! quiet) |
| { |
| printf("received %d from buffer[%d]\n", *d, out); |
| fflush(stdout); |
| } |
| sem_post(b->free); |
| } |
| |
| static void buffer_send(buffer_t* b, data_t* d) |
| { |
| int in; |
| sem_wait(b->free); |
| if (use_locking) |
| pthread_mutex_lock(&b->mutex_in); |
| in = fetch_and_add(&b->in, 1); |
| if (in >= BUFFER_MAX) |
| { |
| fetch_and_add(&b->in, -BUFFER_MAX); |
| in -= BUFFER_MAX; |
| } |
| b->buffer[in] = *d; |
| if (use_locking) |
| pthread_mutex_unlock(&b->mutex_in); |
| if (! quiet) |
| { |
| printf("sent %d to buffer[%d]\n", *d, in); |
| fflush(stdout); |
| } |
| sem_post(b->data); |
| } |
| |
| static void buffer_destroy(buffer_t* b) |
| { |
| destroy_semaphore(DATA_SEMAPHORE_NAME, b->data); |
| destroy_semaphore(FREE_SEMAPHORE_NAME, b->free); |
| |
| pthread_mutex_destroy(&b->mutex_in); |
| pthread_mutex_destroy(&b->mutex_out); |
| } |
| |
| static buffer_t b; |
| |
| static void producer(int* id) |
| { |
| buffer_send(&b, id); |
| pthread_exit(NULL); |
| } |
| |
| #define MAXSLEEP (100 * 1000) |
| |
| static void consumer(int* id) |
| { |
| int d; |
| usleep(rand() % MAXSLEEP); |
| buffer_recv(&b, &d); |
| if (! quiet) |
| { |
| printf("%i: %i\n", *id, d); |
| fflush(stdout); |
| } |
| pthread_exit(NULL); |
| } |
| |
| #define THREADS (10) |
| |
| int main(int argc, char** argv) |
| { |
| pthread_t producers[THREADS]; |
| pthread_t consumers[THREADS]; |
| int thread_arg[THREADS]; |
| int i; |
| int optchar; |
| |
| while ((optchar = getopt(argc, argv, "nq")) != EOF) |
| { |
| switch (optchar) |
| { |
| case 'n': |
| use_locking = 0; |
| break; |
| case 'q': |
| quiet = 1; |
| break; |
| } |
| } |
| |
| srand(time(NULL)); |
| |
| buffer_init(&b); |
| |
| for (i = 0; i < THREADS; ++i) |
| { |
| thread_arg[i] = i; |
| pthread_create(producers + i, NULL, |
| (void * (*)(void *)) producer, &thread_arg[i]); |
| } |
| |
| for (i = 0; i < THREADS; ++i) |
| pthread_create(consumers + i, NULL, |
| (void * (*)(void *)) consumer, &thread_arg[i]); |
| |
| for (i = 0; i < THREADS; ++i) |
| { |
| pthread_join(producers[i], NULL); |
| pthread_join(consumers[i], NULL); |
| } |
| |
| buffer_destroy(&b); |
| |
| return 0; |
| } |