blob: b7ab5641644cd9abcad01211edb1fc5612553433 [file] [log] [blame]
bart6fd7d742008-12-17 19:21:17 +00001/* Test program that performs producer-consumer style communication through
2 * a circular buffer. This test program is a slightly modified version of the
3 * test program made available by Miguel Ojeda
4 * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782.
5 */
6
7
8#include <stdio.h>
9#include <string.h>
10#include <stdlib.h>
11#include <unistd.h>
12#include <time.h>
13#include <pthread.h>
14#include <semaphore.h>
tomd759a832010-04-29 09:36:35 +000015#include <fcntl.h>
bartd94169a2008-12-21 17:16:03 +000016#include "../../config.h"
17
18
19/** gcc versions 4.1.0 and later have support for atomic builtins. */
bartd45d9952009-05-31 18:53:54 +000020
21#ifndef HAVE_BUILTIN_ATOMIC
22#error Sorry, but this test program can only be compiled by a compiler that\
23has built-in functions for atomic memory access.
24#endif
bartd94169a2008-12-21 17:16:03 +000025
bart6fd7d742008-12-17 19:21:17 +000026
27#define BUFFER_MAX (2)
bart6e38cb22009-07-23 18:22:00 +000028#define DATA_SEMAPHORE_NAME "cb-data-semaphore"
29#define FREE_SEMAPHORE_NAME "cb-free-semaphore"
bart6fd7d742008-12-17 19:21:17 +000030
bartd94169a2008-12-21 17:16:03 +000031
bart6fd7d742008-12-17 19:21:17 +000032typedef int data_t;
33
34typedef struct {
35 /* Counting semaphore representing the number of data items in the buffer. */
bart6e38cb22009-07-23 18:22:00 +000036 sem_t* data;
bart6fd7d742008-12-17 19:21:17 +000037 /* Counting semaphore representing the number of free elements. */
bart6e38cb22009-07-23 18:22:00 +000038 sem_t* free;
bart6fd7d742008-12-17 19:21:17 +000039 /* Position where a new elements should be written. */
bartd94169a2008-12-21 17:16:03 +000040 int in;
bart6fd7d742008-12-17 19:21:17 +000041 /* Position from where an element can be removed. */
bartd94169a2008-12-21 17:16:03 +000042 int out;
bart6fd7d742008-12-17 19:21:17 +000043 /* Mutex that protects 'in'. */
44 pthread_mutex_t mutex_in;
45 /* Mutex that protects 'out'. */
46 pthread_mutex_t mutex_out;
47 /* Data buffer. */
48 data_t buffer[BUFFER_MAX];
49} buffer_t;
50
51static int quiet = 0;
bart03225a82008-12-21 17:19:05 +000052static int use_locking = 1;
bart6fd7d742008-12-17 19:21:17 +000053
bartd94169a2008-12-21 17:16:03 +000054static __inline__
55int fetch_and_add(int* p, int i)
56{
57 return __sync_fetch_and_add(p, i);
58}
59
bart6e38cb22009-07-23 18:22:00 +000060static sem_t* create_semaphore(const char* const name, const int value)
bart6fd7d742008-12-17 19:21:17 +000061{
bart3e7c4022011-03-05 14:11:40 +000062#ifdef VGO_darwin
bart8c7213f2011-03-05 14:49:12 +000063 char name_and_pid[32];
64 snprintf(name_and_pid, sizeof(name_and_pid), "%s-%d", name, getpid());
65 sem_t* p = sem_open(name_and_pid, O_CREAT | O_EXCL, 0600, value);
66 if (p == SEM_FAILED) {
67 perror("sem_open");
68 return NULL;
69 }
bart6e38cb22009-07-23 18:22:00 +000070 return p;
71#else
72 sem_t* p = malloc(sizeof(*p));
73 if (p)
74 sem_init(p, 0, value);
75 return p;
76#endif
77}
78
79static void destroy_semaphore(const char* const name, sem_t* p)
80{
bart3e7c4022011-03-05 14:11:40 +000081#ifdef VGO_darwin
bart6e38cb22009-07-23 18:22:00 +000082 sem_close(p);
83 sem_unlink(name);
84#else
85 sem_destroy(p);
86 free(p);
87#endif
88}
89
90static void buffer_init(buffer_t * b)
91{
92 b->data = create_semaphore(DATA_SEMAPHORE_NAME, 0);
93 b->free = create_semaphore(FREE_SEMAPHORE_NAME, BUFFER_MAX);
bart6fd7d742008-12-17 19:21:17 +000094
95 pthread_mutex_init(&b->mutex_in, NULL);
96 pthread_mutex_init(&b->mutex_out, NULL);
97
98 b->in = 0;
99 b->out = 0;
100}
101
bart6e38cb22009-07-23 18:22:00 +0000102static void buffer_recv(buffer_t* b, data_t* d)
bart6fd7d742008-12-17 19:21:17 +0000103{
bartd94169a2008-12-21 17:16:03 +0000104 int out;
bart6e38cb22009-07-23 18:22:00 +0000105 sem_wait(b->data);
bart03225a82008-12-21 17:19:05 +0000106 if (use_locking)
107 pthread_mutex_lock(&b->mutex_out);
bartd94169a2008-12-21 17:16:03 +0000108 out = fetch_and_add(&b->out, 1);
109 if (out >= BUFFER_MAX)
110 {
111 fetch_and_add(&b->out, -BUFFER_MAX);
112 out -= BUFFER_MAX;
113 }
114 *d = b->buffer[out];
bart03225a82008-12-21 17:19:05 +0000115 if (use_locking)
116 pthread_mutex_unlock(&b->mutex_out);
bartd94169a2008-12-21 17:16:03 +0000117 if (! quiet)
118 {
119 printf("received %d from buffer[%d]\n", *d, out);
120 fflush(stdout);
121 }
bart6e38cb22009-07-23 18:22:00 +0000122 sem_post(b->free);
bart6fd7d742008-12-17 19:21:17 +0000123}
124
bart6e38cb22009-07-23 18:22:00 +0000125static void buffer_send(buffer_t* b, data_t* d)
bart6fd7d742008-12-17 19:21:17 +0000126{
bartd94169a2008-12-21 17:16:03 +0000127 int in;
bart6e38cb22009-07-23 18:22:00 +0000128 sem_wait(b->free);
bart03225a82008-12-21 17:19:05 +0000129 if (use_locking)
130 pthread_mutex_lock(&b->mutex_in);
bartd94169a2008-12-21 17:16:03 +0000131 in = fetch_and_add(&b->in, 1);
132 if (in >= BUFFER_MAX)
133 {
134 fetch_and_add(&b->in, -BUFFER_MAX);
135 in -= BUFFER_MAX;
136 }
137 b->buffer[in] = *d;
bart03225a82008-12-21 17:19:05 +0000138 if (use_locking)
139 pthread_mutex_unlock(&b->mutex_in);
bartd94169a2008-12-21 17:16:03 +0000140 if (! quiet)
141 {
142 printf("sent %d to buffer[%d]\n", *d, in);
143 fflush(stdout);
144 }
bart6e38cb22009-07-23 18:22:00 +0000145 sem_post(b->data);
bart6fd7d742008-12-17 19:21:17 +0000146}
147
bart6e38cb22009-07-23 18:22:00 +0000148static void buffer_destroy(buffer_t* b)
bart6fd7d742008-12-17 19:21:17 +0000149{
bart6e38cb22009-07-23 18:22:00 +0000150 destroy_semaphore(DATA_SEMAPHORE_NAME, b->data);
151 destroy_semaphore(FREE_SEMAPHORE_NAME, b->free);
bart6fd7d742008-12-17 19:21:17 +0000152
153 pthread_mutex_destroy(&b->mutex_in);
154 pthread_mutex_destroy(&b->mutex_out);
155}
156
bart6e38cb22009-07-23 18:22:00 +0000157static buffer_t b;
bart6fd7d742008-12-17 19:21:17 +0000158
bart6e38cb22009-07-23 18:22:00 +0000159static void producer(int* id)
bart6fd7d742008-12-17 19:21:17 +0000160{
161 buffer_send(&b, id);
162 pthread_exit(NULL);
163}
164
165#define MAXSLEEP (100 * 1000)
166
bart6e38cb22009-07-23 18:22:00 +0000167static void consumer(int* id)
bart6fd7d742008-12-17 19:21:17 +0000168{
169 int d;
170 usleep(rand() % MAXSLEEP);
171 buffer_recv(&b, &d);
172 if (! quiet)
bartd94169a2008-12-21 17:16:03 +0000173 {
bart6fd7d742008-12-17 19:21:17 +0000174 printf("%i: %i\n", *id, d);
bartd94169a2008-12-21 17:16:03 +0000175 fflush(stdout);
176 }
bart6fd7d742008-12-17 19:21:17 +0000177 pthread_exit(NULL);
178}
179
180#define THREADS (10)
181
182int main(int argc, char** argv)
183{
184 pthread_t producers[THREADS];
185 pthread_t consumers[THREADS];
186 int thread_arg[THREADS];
187 int i;
188 int optchar;
189
bart03225a82008-12-21 17:19:05 +0000190 while ((optchar = getopt(argc, argv, "nq")) != EOF)
bart6fd7d742008-12-17 19:21:17 +0000191 {
192 switch (optchar)
193 {
bart03225a82008-12-21 17:19:05 +0000194 case 'n':
195 use_locking = 0;
196 break;
bart6fd7d742008-12-17 19:21:17 +0000197 case 'q':
198 quiet = 1;
199 break;
200 }
201 }
202
203 srand(time(NULL));
204
205 buffer_init(&b);
206
207 for (i = 0; i < THREADS; ++i)
208 {
209 thread_arg[i] = i;
210 pthread_create(producers + i, NULL,
211 (void * (*)(void *)) producer, &thread_arg[i]);
212 }
213
214 for (i = 0; i < THREADS; ++i)
215 pthread_create(consumers + i, NULL,
216 (void * (*)(void *)) consumer, &thread_arg[i]);
217
218 for (i = 0; i < THREADS; ++i)
219 {
220 pthread_join(producers[i], NULL);
221 pthread_join(consumers[i], NULL);
222 }
223
224 buffer_destroy(&b);
225
226 return 0;
227}