blob: 9a7238cb43b74dd81d537c80b2d5e39b7ee9c196 [file] [log] [blame]
bart6fd7d742008-12-17 19:21:17 +00001/* Test program that performs producer-consumer style communication through
2 * a circular buffer. This test program is a slightly modified version of the
3 * test program made available by Miguel Ojeda
4 * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782.
5 */
6
7
8#include <stdio.h>
9#include <string.h>
10#include <stdlib.h>
11#include <unistd.h>
12#include <time.h>
13#include <pthread.h>
14#include <semaphore.h>
bartd94169a2008-12-21 17:16:03 +000015#include "../../config.h"
16
17
18/** gcc versions 4.1.0 and later have support for atomic builtins. */
bartd45d9952009-05-31 18:53:54 +000019
20#ifndef HAVE_BUILTIN_ATOMIC
21#error Sorry, but this test program can only be compiled by a compiler that\
22has built-in functions for atomic memory access.
23#endif
bartd94169a2008-12-21 17:16:03 +000024
bart6fd7d742008-12-17 19:21:17 +000025
26#define BUFFER_MAX (2)
bart6e38cb22009-07-23 18:22:00 +000027#define DATA_SEMAPHORE_NAME "cb-data-semaphore"
28#define FREE_SEMAPHORE_NAME "cb-free-semaphore"
bart6fd7d742008-12-17 19:21:17 +000029
bartd94169a2008-12-21 17:16:03 +000030
bart6fd7d742008-12-17 19:21:17 +000031typedef int data_t;
32
33typedef struct {
34 /* Counting semaphore representing the number of data items in the buffer. */
bart6e38cb22009-07-23 18:22:00 +000035 sem_t* data;
bart6fd7d742008-12-17 19:21:17 +000036 /* Counting semaphore representing the number of free elements. */
bart6e38cb22009-07-23 18:22:00 +000037 sem_t* free;
bart6fd7d742008-12-17 19:21:17 +000038 /* Position where a new elements should be written. */
bartd94169a2008-12-21 17:16:03 +000039 int in;
bart6fd7d742008-12-17 19:21:17 +000040 /* Position from where an element can be removed. */
bartd94169a2008-12-21 17:16:03 +000041 int out;
bart6fd7d742008-12-17 19:21:17 +000042 /* Mutex that protects 'in'. */
43 pthread_mutex_t mutex_in;
44 /* Mutex that protects 'out'. */
45 pthread_mutex_t mutex_out;
46 /* Data buffer. */
47 data_t buffer[BUFFER_MAX];
48} buffer_t;
49
50static int quiet = 0;
bart03225a82008-12-21 17:19:05 +000051static int use_locking = 1;
bart6fd7d742008-12-17 19:21:17 +000052
bartd94169a2008-12-21 17:16:03 +000053static __inline__
54int fetch_and_add(int* p, int i)
55{
56 return __sync_fetch_and_add(p, i);
57}
58
bart6e38cb22009-07-23 18:22:00 +000059static sem_t* create_semaphore(const char* const name, const int value)
bart6fd7d742008-12-17 19:21:17 +000060{
bart6e38cb22009-07-23 18:22:00 +000061#ifdef __APPLE__
62 sem_t* p = sem_open(name, O_CREAT, 0600, value);
63 return p;
64#else
65 sem_t* p = malloc(sizeof(*p));
66 if (p)
67 sem_init(p, 0, value);
68 return p;
69#endif
70}
71
72static void destroy_semaphore(const char* const name, sem_t* p)
73{
74#ifdef __APPLE__
75 sem_close(p);
76 sem_unlink(name);
77#else
78 sem_destroy(p);
79 free(p);
80#endif
81}
82
83static void buffer_init(buffer_t * b)
84{
85 b->data = create_semaphore(DATA_SEMAPHORE_NAME, 0);
86 b->free = create_semaphore(FREE_SEMAPHORE_NAME, BUFFER_MAX);
bart6fd7d742008-12-17 19:21:17 +000087
88 pthread_mutex_init(&b->mutex_in, NULL);
89 pthread_mutex_init(&b->mutex_out, NULL);
90
91 b->in = 0;
92 b->out = 0;
93}
94
bart6e38cb22009-07-23 18:22:00 +000095static void buffer_recv(buffer_t* b, data_t* d)
bart6fd7d742008-12-17 19:21:17 +000096{
bartd94169a2008-12-21 17:16:03 +000097 int out;
bart6e38cb22009-07-23 18:22:00 +000098 sem_wait(b->data);
bart03225a82008-12-21 17:19:05 +000099 if (use_locking)
100 pthread_mutex_lock(&b->mutex_out);
bartd94169a2008-12-21 17:16:03 +0000101 out = fetch_and_add(&b->out, 1);
102 if (out >= BUFFER_MAX)
103 {
104 fetch_and_add(&b->out, -BUFFER_MAX);
105 out -= BUFFER_MAX;
106 }
107 *d = b->buffer[out];
bart03225a82008-12-21 17:19:05 +0000108 if (use_locking)
109 pthread_mutex_unlock(&b->mutex_out);
bartd94169a2008-12-21 17:16:03 +0000110 if (! quiet)
111 {
112 printf("received %d from buffer[%d]\n", *d, out);
113 fflush(stdout);
114 }
bart6e38cb22009-07-23 18:22:00 +0000115 sem_post(b->free);
bart6fd7d742008-12-17 19:21:17 +0000116}
117
bart6e38cb22009-07-23 18:22:00 +0000118static void buffer_send(buffer_t* b, data_t* d)
bart6fd7d742008-12-17 19:21:17 +0000119{
bartd94169a2008-12-21 17:16:03 +0000120 int in;
bart6e38cb22009-07-23 18:22:00 +0000121 sem_wait(b->free);
bart03225a82008-12-21 17:19:05 +0000122 if (use_locking)
123 pthread_mutex_lock(&b->mutex_in);
bartd94169a2008-12-21 17:16:03 +0000124 in = fetch_and_add(&b->in, 1);
125 if (in >= BUFFER_MAX)
126 {
127 fetch_and_add(&b->in, -BUFFER_MAX);
128 in -= BUFFER_MAX;
129 }
130 b->buffer[in] = *d;
bart03225a82008-12-21 17:19:05 +0000131 if (use_locking)
132 pthread_mutex_unlock(&b->mutex_in);
bartd94169a2008-12-21 17:16:03 +0000133 if (! quiet)
134 {
135 printf("sent %d to buffer[%d]\n", *d, in);
136 fflush(stdout);
137 }
bart6e38cb22009-07-23 18:22:00 +0000138 sem_post(b->data);
bart6fd7d742008-12-17 19:21:17 +0000139}
140
bart6e38cb22009-07-23 18:22:00 +0000141static void buffer_destroy(buffer_t* b)
bart6fd7d742008-12-17 19:21:17 +0000142{
bart6e38cb22009-07-23 18:22:00 +0000143 destroy_semaphore(DATA_SEMAPHORE_NAME, b->data);
144 destroy_semaphore(FREE_SEMAPHORE_NAME, b->free);
bart6fd7d742008-12-17 19:21:17 +0000145
146 pthread_mutex_destroy(&b->mutex_in);
147 pthread_mutex_destroy(&b->mutex_out);
148}
149
bart6e38cb22009-07-23 18:22:00 +0000150static buffer_t b;
bart6fd7d742008-12-17 19:21:17 +0000151
bart6e38cb22009-07-23 18:22:00 +0000152static void producer(int* id)
bart6fd7d742008-12-17 19:21:17 +0000153{
154 buffer_send(&b, id);
155 pthread_exit(NULL);
156}
157
158#define MAXSLEEP (100 * 1000)
159
bart6e38cb22009-07-23 18:22:00 +0000160static void consumer(int* id)
bart6fd7d742008-12-17 19:21:17 +0000161{
162 int d;
163 usleep(rand() % MAXSLEEP);
164 buffer_recv(&b, &d);
165 if (! quiet)
bartd94169a2008-12-21 17:16:03 +0000166 {
bart6fd7d742008-12-17 19:21:17 +0000167 printf("%i: %i\n", *id, d);
bartd94169a2008-12-21 17:16:03 +0000168 fflush(stdout);
169 }
bart6fd7d742008-12-17 19:21:17 +0000170 pthread_exit(NULL);
171}
172
173#define THREADS (10)
174
175int main(int argc, char** argv)
176{
177 pthread_t producers[THREADS];
178 pthread_t consumers[THREADS];
179 int thread_arg[THREADS];
180 int i;
181 int optchar;
182
bart03225a82008-12-21 17:19:05 +0000183 while ((optchar = getopt(argc, argv, "nq")) != EOF)
bart6fd7d742008-12-17 19:21:17 +0000184 {
185 switch (optchar)
186 {
bart03225a82008-12-21 17:19:05 +0000187 case 'n':
188 use_locking = 0;
189 break;
bart6fd7d742008-12-17 19:21:17 +0000190 case 'q':
191 quiet = 1;
192 break;
193 }
194 }
195
196 srand(time(NULL));
197
198 buffer_init(&b);
199
200 for (i = 0; i < THREADS; ++i)
201 {
202 thread_arg[i] = i;
203 pthread_create(producers + i, NULL,
204 (void * (*)(void *)) producer, &thread_arg[i]);
205 }
206
207 for (i = 0; i < THREADS; ++i)
208 pthread_create(consumers + i, NULL,
209 (void * (*)(void *)) consumer, &thread_arg[i]);
210
211 for (i = 0; i < THREADS; ++i)
212 {
213 pthread_join(producers[i], NULL);
214 pthread_join(consumers[i], NULL);
215 }
216
217 buffer_destroy(&b);
218
219 return 0;
220}