blob: b8c01dffaddbc801dc28860f7e04ef90068c81ed [file] [log] [blame]
Marat Dukhan0a312192015-08-22 17:46:29 -04001/* Standard C headers */
2#include <stdint.h>
3#include <stdbool.h>
Marat Dukhan3a45d9a2015-08-23 22:25:19 -04004#include <stdlib.h>
Marat Dukhan0a312192015-08-22 17:46:29 -04005#include <string.h>
6#include <assert.h>
7
8/* POSIX headers */
9#include <pthread.h>
10#include <unistd.h>
11
12/* Library header */
13#include <pthreadpool.h>
14
15#define PTHREADPOOL_CACHELINE_SIZE 64
16#define PTHREADPOOL_CACHELINE_ALIGNED __attribute__((__aligned__(PTHREADPOOL_CACHELINE_SIZE)))
17#define PTHREADPOOL_STATIC_ASSERT(predicate, message) _Static_assert((predicate), message)
18
19enum thread_state {
20 thread_state_idle,
21 thread_state_compute_1d,
22 thread_state_shutdown,
23};
24
25struct PTHREADPOOL_CACHELINE_ALIGNED thread_info {
26 /**
27 * Index of the first element in the work range.
28 * Before processing a new element the owning worker thread increments this value.
29 */
30 volatile size_t range_start;
31 /**
32 * Index of the element after the last element of the work range.
33 * Before processing a new element the stealing worker thread decrements this value.
34 */
35 volatile size_t range_end;
36 /**
37 * The number of elements in the work range.
38 * Due to race conditions range_length <= range_end - range_start.
39 * The owning worker thread must decrement this value before incrementing @a range_start.
40 * The stealing worker thread must decrement this value before decrementing @a range_end.
41 */
42 volatile size_t range_length;
43 /**
44 * The active state of the thread.
45 */
46 volatile enum thread_state state;
47 /**
48 * Thread number in the 0..threads_count-1 range.
49 */
50 size_t thread_number;
51 /**
52 * The pthread object corresponding to the thread.
53 */
54 pthread_t thread_object;
55 /**
56 * Condition variable used to wake up the thread.
57 * When the thread is idle, it waits on this condition variable.
58 */
59 pthread_cond_t wakeup_condvar;
60};
61
62PTHREADPOOL_STATIC_ASSERT(sizeof(struct thread_info) % PTHREADPOOL_CACHELINE_SIZE == 0, "thread_info structure must occupy an integer number of cache lines (64 bytes)");
63
64struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool {
65 /**
66 * The number of threads that signalled completion of an operation.
67 */
68 volatile size_t checkedin_threads;
69 /**
70 * The function to call for each item.
71 */
72 volatile pthreadpool_function_1d_t function;
73 /**
74 * The first argument to the item processing function.
75 */
76 void *volatile argument;
77 /**
78 * Serializes concurrent calls to @a pthreadpool_compute_* from different threads.
79 */
80 pthread_mutex_t execution_mutex;
81 /**
82 * Guards access to the @a checkedin_threads variable.
83 */
84 pthread_mutex_t barrier_mutex;
85 /**
86 * Condition variable to wait until all threads check in.
87 */
88 pthread_cond_t barrier_condvar;
89 /**
90 * Guards access to the @a state variables.
91 */
92 pthread_mutex_t state_mutex;
93 /**
94 * Condition variable to wait for change of @a state variable.
95 */
96 pthread_cond_t state_condvar;
97 /**
98 * The number of threads in the thread pool. Never changes after initialization.
99 */
100 size_t threads_count;
101 /**
102 * Thread information structures that immediately follow this structure.
103 */
104 struct thread_info threads[];
105};
106
107PTHREADPOOL_STATIC_ASSERT(sizeof(struct pthreadpool) % PTHREADPOOL_CACHELINE_SIZE == 0, "pthreadpool structure must occupy an integer number of cache lines (64 bytes)");
108
109static void checkin_worker_thread(struct pthreadpool* threadpool) {
110 pthread_mutex_lock(&threadpool->barrier_mutex);
111 const size_t checkedin_threads = threadpool->checkedin_threads + 1;
112 threadpool->checkedin_threads = checkedin_threads;
113 if (checkedin_threads == threadpool->threads_count) {
114 pthread_cond_signal(&threadpool->barrier_condvar);
115 }
116 pthread_mutex_unlock(&threadpool->barrier_mutex);
117}
118
119static void wait_worker_threads(struct pthreadpool* threadpool) {
120 if (threadpool->checkedin_threads != threadpool->threads_count) {
121 pthread_mutex_lock(&threadpool->barrier_mutex);
122 while (threadpool->checkedin_threads != threadpool->threads_count) {
123 pthread_cond_wait(&threadpool->barrier_condvar, &threadpool->barrier_mutex);
124 };
125 pthread_mutex_unlock(&threadpool->barrier_mutex);
126 }
127}
128
129static void wakeup_worker_threads(struct pthreadpool* threadpool) {
130 pthread_mutex_lock(&threadpool->state_mutex);
131 threadpool->checkedin_threads = 0; /* Locking of barrier_mutex not needed: readers are sleeping */
132 pthread_cond_broadcast(&threadpool->state_condvar);
133 pthread_mutex_unlock(&threadpool->state_mutex); /* Do wake up */
134}
135
136inline static bool atomic_decrement(volatile size_t* value) {
137 size_t actual_value = *value;
138 if (actual_value != 0) {
139 size_t expected_value;
140 do {
141 expected_value = actual_value;
142 const size_t new_value = actual_value - 1;
143 actual_value = __sync_val_compare_and_swap(value, expected_value, new_value);
144 } while ((actual_value != expected_value) && (actual_value != 0));
145 }
146 return actual_value != 0;
147}
148
149static void thread_compute_1d(struct pthreadpool* threadpool, struct thread_info* thread) {
150 const pthreadpool_function_1d_t function = threadpool->function;
151 void *const argument = threadpool->argument;
152 /* Process thread's own range of items */
153 size_t range_start = thread->range_start;
154 while (atomic_decrement(&thread->range_length)) {
155 function(argument, range_start++);
156 }
157 /* Done, now look for other threads' items to steal */
158 const size_t thread_number = thread->thread_number;
159 const size_t threads_count = threadpool->threads_count;
160 for (size_t tid = (thread_number + 1) % threads_count; tid != thread_number; tid = (tid + 1) % threads_count) {
161 struct thread_info* other_thread = &threadpool->threads[tid];
162 if (other_thread->state != thread_state_idle) {
163 while (atomic_decrement(&other_thread->range_length)) {
164 const size_t item_id = __sync_sub_and_fetch(&other_thread->range_end, 1);
165 function(argument, item_id);
166 }
167 }
168 }
169}
170
171static void* thread_main(void* arg) {
172 struct thread_info* thread = (struct thread_info*) arg;
173 struct pthreadpool* threadpool = ((struct pthreadpool*) (thread - thread->thread_number)) - 1;
174
175 /* Check in */
176 checkin_worker_thread(threadpool);
177
178 /* Monitor the state changes and act accordingly */
179 for (;;) {
180 /* Lock the state mutex */
181 pthread_mutex_lock(&threadpool->state_mutex);
182 /* Read the state */
183 enum thread_state state;
184 while ((state = thread->state) == thread_state_idle) {
185 /* Wait for state change */
186 pthread_cond_wait(&threadpool->state_condvar, &threadpool->state_mutex);
187 }
188 /* Read non-idle state */
189 pthread_mutex_unlock(&threadpool->state_mutex);
190 switch (state) {
191 case thread_state_compute_1d:
192 thread_compute_1d(threadpool, thread);
193 break;
194 case thread_state_shutdown:
195 return NULL;
196 case thread_state_idle:
197 /* To inhibit compiler warning */
198 break;
199 }
200 /* Notify the master thread that we finished processing */
201 thread->state = thread_state_idle;
202 checkin_worker_thread(threadpool);
203 };
204}
205
206struct pthreadpool* pthreadpool_create(size_t threads_count) {
207 if (threads_count == 0) {
208 threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN);
209 }
Marat Dukhan3a45d9a2015-08-23 22:25:19 -0400210 struct pthreadpool* threadpool = NULL;
211 posix_memalign((void**) &threadpool, 64, sizeof(struct pthreadpool) + threads_count * sizeof(struct thread_info));
Marat Dukhan0a312192015-08-22 17:46:29 -0400212 memset(threadpool, 0, sizeof(struct pthreadpool) + threads_count * sizeof(struct thread_info));
213 threadpool->threads_count = threads_count;
214 pthread_mutex_init(&threadpool->execution_mutex, NULL);
215 pthread_mutex_init(&threadpool->barrier_mutex, NULL);
216 pthread_cond_init(&threadpool->barrier_condvar, NULL);
217 pthread_mutex_init(&threadpool->state_mutex, NULL);
218 pthread_cond_init(&threadpool->state_condvar, NULL);
219
220 for (size_t tid = 0; tid < threads_count; tid++) {
221 threadpool->threads[tid].thread_number = tid;
222 pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]);
223 }
224
225 /* Wait until all threads initialize */
226 wait_worker_threads(threadpool);
227 return threadpool;
228}
229
Marat Dukhan7b1f6e52015-08-25 11:24:08 -0400230size_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) {
Marat Dukhan0a312192015-08-22 17:46:29 -0400231 return threadpool->threads_count;
232}
233
234static inline size_t multiply_divide(size_t a, size_t b, size_t d) {
235 #if defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 4)
236 return (size_t) (((uint64_t) a) * ((uint64_t) b)) / ((uint64_t) d);
237 #elif defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 8)
Marat Dukhanc058bd32015-08-23 22:24:48 -0400238 return (size_t) (((__uint128_t) a) * ((__uint128_t) b)) / ((__uint128_t) d);
Marat Dukhan0a312192015-08-22 17:46:29 -0400239 #else
240 #error "Unsupported platform"
241 #endif
242}
243
244void pthreadpool_compute_1d(
245 struct pthreadpool* threadpool,
246 pthreadpool_function_1d_t function,
247 void* argument,
248 size_t items)
249{
250 /* Protect the global threadpool structures */
251 pthread_mutex_lock(&threadpool->execution_mutex);
252
253 /* Spread the work between threads */
254 for (size_t tid = 0; tid < threadpool->threads_count; tid++) {
255 struct thread_info* thread = &threadpool->threads[tid];
256 thread->range_start = multiply_divide(items, tid, threadpool->threads_count);
257 thread->range_end = multiply_divide(items, tid + 1, threadpool->threads_count);
258 thread->range_length = thread->range_end - thread->range_start;
259 thread->state = thread_state_compute_1d;
260 }
261
262 /* Setup global arguments */
263 threadpool->function = function;
264 threadpool->argument = argument;
265
266 /* Wake up the threads */
267 wakeup_worker_threads(threadpool);
268
269 /* Wait until the threads finish computation */
270 wait_worker_threads(threadpool);
271
272 /* Unprotect the global threadpool structures */
273 pthread_mutex_unlock(&threadpool->execution_mutex);
274}
275
276void pthreadpool_destroy(struct pthreadpool* threadpool) {
277 /* Update threads' states */
278 for (size_t tid = 0; tid < threadpool->threads_count; tid++) {
279 threadpool->threads[tid].state = thread_state_shutdown;
280 }
281
282 /* Wake up the threads */
283 wakeup_worker_threads(threadpool);
284
285 /* Wait until all threads return */
286 for (size_t tid = 0; tid < threadpool->threads_count; tid++) {
287 pthread_join(threadpool->threads[tid].thread_object, NULL);
288 }
289
290 /* Release resources */
291 pthread_mutex_destroy(&threadpool->execution_mutex);
292 pthread_mutex_destroy(&threadpool->barrier_mutex);
293 pthread_cond_destroy(&threadpool->barrier_condvar);
294 pthread_mutex_destroy(&threadpool->state_mutex);
295 pthread_cond_destroy(&threadpool->state_condvar);
296 free(threadpool);
297}