blob: 30f15955ec504969be504de61257dd368f7a1db3 [file] [log] [blame]
sewardj3b5d8862002-04-20 13:53:23 +00001/********************************************************
2 * An example source module to accompany...
3 *
4 * "Using POSIX Threads: Programming with Pthreads"
5 * by Brad nichols, Dick Buttlar, Jackie Farrell
6 * O'Reilly & Associates, Inc.
7 *
8 ********************************************************
9 * tpool.c --
10 *
11 * Example thread pooling library
12 */
13
14#include <stdlib.h>
15#include <stdio.h>
16#include <unistd.h>
17#include <sys/types.h>
18#include <string.h>
19
20#include <pthread.h>
21
22
23/********************************************************
24 * An example source module to accompany...
25 *
26 * "Using POSIX Threads: Programming with Pthreads"
27 * by Brad nichols, Dick Buttlar, Jackie Farrell
28 * O'Reilly & Associates, Inc.
29 *
30 ********************************************************
31 * tpool.h --
32 *
33 * Structures for thread pool
34 */
35
36typedef struct tpool_work {
37 void (*routine)();
38 void *arg;
39 struct tpool_work *next;
40} tpool_work_t;
41
42typedef struct tpool {
43 /* pool characteristics */
44 int num_threads;
45 int max_queue_size;
46 int do_not_block_when_full;
47 /* pool state */
48 pthread_t *threads;
49 int cur_queue_size;
50 tpool_work_t *queue_head;
51 tpool_work_t *queue_tail;
52 int queue_closed;
53 int shutdown;
54 /* pool synchronization */
55 pthread_mutex_t queue_lock;
56 pthread_cond_t queue_not_empty;
57 pthread_cond_t queue_not_full;
58 pthread_cond_t queue_empty;
59} *tpool_t;
60
61void tpool_init(
62 tpool_t *tpoolp,
63 int num_threads,
64 int max_queue_size,
65 int do_not_block_when_full);
66
67int tpool_add_work(
68 tpool_t tpool,
69 void (*routine)(),
70 void *arg);
71
72int tpool_destroy(
73 tpool_t tpool,
74 int finish);
75
76
77/*-- end of tpool.h ----------------------------------*/
78
79
80void *tpool_thread(void *);
81
82void tpool_init(tpool_t *tpoolp,
83 int num_worker_threads,
84 int max_queue_size,
85 int do_not_block_when_full)
86{
87 int i, rtn;
88 tpool_t tpool;
89
90 /* allocate a pool data structure */
91 if ((tpool = (tpool_t )malloc(sizeof(struct tpool))) == NULL)
92 perror("malloc"), exit(1);
93
94 /* initialize th fields */
95 tpool->num_threads = num_worker_threads;
96 tpool->max_queue_size = max_queue_size;
97 tpool->do_not_block_when_full = do_not_block_when_full;
98 if ((tpool->threads =
99 (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads))
100 == NULL)
101 perror("malloc"), exit(1);
102 tpool->cur_queue_size = 0;
103 tpool->queue_head = NULL;
104 tpool->queue_tail = NULL;
105 tpool->queue_closed = 0;
106 tpool->shutdown = 0;
107 if ((rtn = pthread_mutex_init(&(tpool->queue_lock), NULL)) != 0)
108 fprintf(stderr,"pthread_mutex_init %s\n",strerror(rtn)), exit(1);
109 if ((rtn = pthread_cond_init(&(tpool->queue_not_empty), NULL)) != 0)
110 fprintf(stderr,"pthread_cond_init %s\n",strerror(rtn)), exit(1);
111 if ((rtn = pthread_cond_init(&(tpool->queue_not_full), NULL)) != 0)
112 fprintf(stderr,"pthread_cond_init %s\n",strerror(rtn)), exit(1);
113 if ((rtn = pthread_cond_init(&(tpool->queue_empty), NULL)) != 0)
114 fprintf(stderr,"pthread_cond_init %s\n",strerror(rtn)), exit(1);
115
116 /* create threads */
117 for (i = 0; i != num_worker_threads; i++) {
118 if ((rtn = pthread_create( &(tpool->threads[i]),
119 NULL,
120 tpool_thread,
121 (void *)tpool)) != 0)
122 fprintf(stderr,"pthread_create %d\n",rtn), exit(1);
123 }
124
125 *tpoolp = tpool;
126}
127
128int tpool_add_work(
129 tpool_t tpool,
130 void (*routine)(),
131 void *arg)
132{
133 int rtn;
134 tpool_work_t *workp;
135
136 if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
137 fprintf(stderr,"pthread_mutex_lock %d\n",rtn), exit(1);
138
139 /* no space and this caller doesn't want to wait */
140 if ((tpool->cur_queue_size == tpool->max_queue_size) &&
141 tpool->do_not_block_when_full) {
142 if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
143 fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
144
145 return -1;
146 }
147
148 while( (tpool->cur_queue_size == tpool->max_queue_size) &&
149 (!(tpool->shutdown || tpool->queue_closed)) ) {
150
151 if ((rtn = pthread_cond_wait(&(tpool->queue_not_full),
152 &(tpool->queue_lock))) != 0)
153 fprintf(stderr,"pthread_cond_waitA %d\n",rtn), exit(1);
154
155 }
156
157 /* the pool is in the process of being destroyed */
158 if (tpool->shutdown || tpool->queue_closed) {
159 if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
160 fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
161
162 return -1;
163 }
164
165
166 /* allocate work structure */
167 if ((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL)
168 perror("malloc"), exit(1);
169 workp->routine = routine;
170 workp->arg = arg;
171 workp->next = NULL;
172
173 printf("adder: adding an item %d\n", workp->routine);
174
175 if (tpool->cur_queue_size == 0) {
176 tpool->queue_tail = tpool->queue_head = workp;
177
178 printf("adder: queue == 0, waking all workers\n");
179
180 if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_empty))) != 0)
181 fprintf(stderr,"pthread_cond_signal %d\n",rtn), exit(1);;
182 } else {
183 tpool->queue_tail->next = workp;
184 tpool->queue_tail = workp;
185 }
186
187 tpool->cur_queue_size++;
188 if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
189 fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
190 return 1;
191}
192
193int tpool_destroy(tpool_t tpool,
194 int finish)
195{
196 int i,rtn;
197 tpool_work_t *cur_nodep;
198
199
200 if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
201 fprintf(stderr,"pthread_mutex_lock %d\n",rtn), exit(1);
202
203 /* Is a shutdown already in progress? */
204 if (tpool->queue_closed || tpool->shutdown) {
205 if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
206 fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
207 return 0;
208 }
209
210 tpool->queue_closed = 1;
211
212 /* If the finish flag is set, wait for workers to
213 drain queue */
214 if (finish == 1) {
215 while (tpool->cur_queue_size != 0) {
216 if ((rtn = pthread_cond_wait(&(tpool->queue_empty),
217 &(tpool->queue_lock))) != 0)
218 fprintf(stderr,"pthread_cond_waitB %d\n",rtn), exit(1);
219 }
220 }
221
222 tpool->shutdown = 1;
223
224 if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
225 fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
226
227
228 /* Wake up any workers so they recheck shutdown flag */
229 if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_empty))) != 0)
230 fprintf(stderr,"pthread_cond_broadcast %d\n",rtn), exit(1);
231 if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_full))) != 0)
232 fprintf(stderr,"pthread_cond_broadcast %d\n",rtn), exit(1);
233
234
235 /* Wait for workers to exit */
236 for(i=0; i < tpool->num_threads; i++) {
237 if ((rtn = pthread_join(tpool->threads[i],NULL)) != 0)
238 fprintf(stderr,"pthread_join %d\n",rtn), exit(1);
239 }
240
241 /* Now free pool structures */
242 free(tpool->threads);
243 while(tpool->queue_head != NULL) {
244 cur_nodep = tpool->queue_head->next;
245 tpool->queue_head = tpool->queue_head->next;
246 free(cur_nodep);
247 }
248 free(tpool);
249}
250
251void *tpool_thread(void *arg)
252{
253 tpool_t tpool = (tpool_t)arg;
254 int rtn;
255 tpool_work_t *my_workp;
256
257 for(;;) {
258
259
260
261 /* Check queue for work */
262 if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
263 fprintf(stderr,"pthread_mutex_lock %d\n",rtn), exit(1);
264
265 while ((tpool->cur_queue_size == 0) && (!tpool->shutdown)) {
266
267
268 printf("worker %d: I'm sleeping again\n", pthread_self());
269
270 if ((rtn = pthread_cond_wait(&(tpool->queue_not_empty),
271 &(tpool->queue_lock))) != 0)
272 fprintf(stderr,"pthread_cond_waitC %d\n",rtn), exit(1);
273
274 }
275 sleep(1);
276
277 printf("worker %d: I'm awake\n", pthread_self());
278
279 /* Has a shutdown started while i was sleeping? */
280 if (tpool->shutdown == 1) {
281
282 if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
283 fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
284 pthread_exit(NULL);
285 }
286
287
288 /* Get to work, dequeue the next item */
289 my_workp = tpool->queue_head;
290 tpool->cur_queue_size--;
291 if (tpool->cur_queue_size == 0)
292 tpool->queue_head = tpool->queue_tail = NULL;
293 else
294 tpool->queue_head = my_workp->next;
295
296 printf("worker %d: dequeing item %d\n", pthread_self(), my_workp->next);
297
298 /* Handle waiting add_work threads */
299 if ((!tpool->do_not_block_when_full) &&
300 (tpool->cur_queue_size == (tpool->max_queue_size - 1)))
301
302 if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_full))) != 0)
303 fprintf(stderr,"pthread_cond_broadcast %d\n",rtn), exit(1);
304
305 /* Handle waiting destroyer threads */
306 if (tpool->cur_queue_size == 0)
307
308 if ((rtn = pthread_cond_signal(&(tpool->queue_empty))) != 0)
309 fprintf(stderr,"pthread_cond_signal %d\n",rtn), exit(1);
310
311 if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
312 fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
313
314 /* Do this work item */
315 (*(my_workp->routine))(my_workp->arg);
316 free(my_workp);
317 }
318 return(NULL);
319}
320
321
322/********************************************************
323 * An example source module to accompany...
324 *
325 * "Using POSIX Threads: Programming with Pthreads"
326 * by Brad nichols, Dick Buttlar, Jackie Farrell
327 * O'Reilly & Associates, Inc.
328 *
329 ********************************************************
330 * tpool.c --
331 *
332 * Example caller for thread pooling library
333 */
334
335char *s1[20]={ "STRING 0",
336 "STRING 1",
337 "STRING 2",
338 "STRING 3",
339 "STRING 4",
340 "STRING 5",
341 "STRING 6",
342 "STRING 7",
343 "STRING 8",
344 "STRING 9",
345 "STRING 10",
346 "STRING 11",
347 "STRING 12",
348 "STRING 13",
349 "STRING 14",
350 "STRING 15",
351 "STRING 16",
352 "STRING 17",
353 "STRING 18",
354 "STRING 19"};
355
356void r1(char * printstring)
357{
358 int i, x;
359
360 printf("%s START\n", printstring);
361
362 for (i = 0; i < 1000000; i++) {
363 x = x +i;
364 }
365
366 printf("%s DONE\n", printstring);
367}
368
369extern int
370main(void)
371{
372 extern char *s1[];
373
374 pthread_t t1,t2;
375 int i;
376
377 tpool_t test_pool;
378
379 tpool_init(&test_pool, 10, 20, 0);
380
381 sleep(1);
382
383 for ( i = 0; i < 5; i++) {
384 printf("tpool_add_work returned %d\n",
385 tpool_add_work(test_pool, r1, s1[i]));
386
387 }
388
389 printf("main: all work queued\n");
390
391 tpool_destroy(test_pool, 1);
392
393 return 0;
394
395}