blob: 8caf5a5904793105c01ad63c00c7796c2196ed3b [file] [log] [blame]
Marat Dukhan0a312192015-08-22 17:46:29 -04001/* Standard C headers */
2#include <stdint.h>
3#include <stdbool.h>
4#include <malloc.h>
5#include <string.h>
6#include <assert.h>
7
8/* POSIX headers */
9#include <pthread.h>
10#include <unistd.h>
11
12/* Library header */
13#include <pthreadpool.h>
14
15#define PTHREADPOOL_CACHELINE_SIZE 64
16#define PTHREADPOOL_CACHELINE_ALIGNED __attribute__((__aligned__(PTHREADPOOL_CACHELINE_SIZE)))
17#define PTHREADPOOL_STATIC_ASSERT(predicate, message) _Static_assert((predicate), message)
18
19enum thread_state {
20 thread_state_idle,
21 thread_state_compute_1d,
22 thread_state_shutdown,
23};
24
25struct PTHREADPOOL_CACHELINE_ALIGNED thread_info {
26 /**
27 * Index of the first element in the work range.
28 * Before processing a new element the owning worker thread increments this value.
29 */
30 volatile size_t range_start;
31 /**
32 * Index of the element after the last element of the work range.
33 * Before processing a new element the stealing worker thread decrements this value.
34 */
35 volatile size_t range_end;
36 /**
37 * The number of elements in the work range.
38 * Due to race conditions range_length <= range_end - range_start.
39 * The owning worker thread must decrement this value before incrementing @a range_start.
40 * The stealing worker thread must decrement this value before decrementing @a range_end.
41 */
42 volatile size_t range_length;
43 /**
44 * The active state of the thread.
45 */
46 volatile enum thread_state state;
47 /**
48 * Thread number in the 0..threads_count-1 range.
49 */
50 size_t thread_number;
51 /**
52 * The pthread object corresponding to the thread.
53 */
54 pthread_t thread_object;
55 /**
56 * Condition variable used to wake up the thread.
57 * When the thread is idle, it waits on this condition variable.
58 */
59 pthread_cond_t wakeup_condvar;
60};
61
62PTHREADPOOL_STATIC_ASSERT(sizeof(struct thread_info) % PTHREADPOOL_CACHELINE_SIZE == 0, "thread_info structure must occupy an integer number of cache lines (64 bytes)");
63
64struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool {
65 /**
66 * The number of threads that signalled completion of an operation.
67 */
68 volatile size_t checkedin_threads;
69 /**
70 * The function to call for each item.
71 */
72 volatile pthreadpool_function_1d_t function;
73 /**
74 * The first argument to the item processing function.
75 */
76 void *volatile argument;
77 /**
78 * Serializes concurrent calls to @a pthreadpool_compute_* from different threads.
79 */
80 pthread_mutex_t execution_mutex;
81 /**
82 * Guards access to the @a checkedin_threads variable.
83 */
84 pthread_mutex_t barrier_mutex;
85 /**
86 * Condition variable to wait until all threads check in.
87 */
88 pthread_cond_t barrier_condvar;
89 /**
90 * Guards access to the @a state variables.
91 */
92 pthread_mutex_t state_mutex;
93 /**
94 * Condition variable to wait for change of @a state variable.
95 */
96 pthread_cond_t state_condvar;
97 /**
98 * The number of threads in the thread pool. Never changes after initialization.
99 */
100 size_t threads_count;
101 /**
102 * Thread information structures that immediately follow this structure.
103 */
104 struct thread_info threads[];
105};
106
107PTHREADPOOL_STATIC_ASSERT(sizeof(struct pthreadpool) % PTHREADPOOL_CACHELINE_SIZE == 0, "pthreadpool structure must occupy an integer number of cache lines (64 bytes)");
108
109static void checkin_worker_thread(struct pthreadpool* threadpool) {
110 pthread_mutex_lock(&threadpool->barrier_mutex);
111 const size_t checkedin_threads = threadpool->checkedin_threads + 1;
112 threadpool->checkedin_threads = checkedin_threads;
113 if (checkedin_threads == threadpool->threads_count) {
114 pthread_cond_signal(&threadpool->barrier_condvar);
115 }
116 pthread_mutex_unlock(&threadpool->barrier_mutex);
117}
118
119static void wait_worker_threads(struct pthreadpool* threadpool) {
120 if (threadpool->checkedin_threads != threadpool->threads_count) {
121 pthread_mutex_lock(&threadpool->barrier_mutex);
122 while (threadpool->checkedin_threads != threadpool->threads_count) {
123 pthread_cond_wait(&threadpool->barrier_condvar, &threadpool->barrier_mutex);
124 };
125 pthread_mutex_unlock(&threadpool->barrier_mutex);
126 }
127}
128
129static void wakeup_worker_threads(struct pthreadpool* threadpool) {
130 pthread_mutex_lock(&threadpool->state_mutex);
131 threadpool->checkedin_threads = 0; /* Locking of barrier_mutex not needed: readers are sleeping */
132 pthread_cond_broadcast(&threadpool->state_condvar);
133 pthread_mutex_unlock(&threadpool->state_mutex); /* Do wake up */
134}
135
136inline static bool atomic_decrement(volatile size_t* value) {
137 size_t actual_value = *value;
138 if (actual_value != 0) {
139 size_t expected_value;
140 do {
141 expected_value = actual_value;
142 const size_t new_value = actual_value - 1;
143 actual_value = __sync_val_compare_and_swap(value, expected_value, new_value);
144 } while ((actual_value != expected_value) && (actual_value != 0));
145 }
146 return actual_value != 0;
147}
148
149static void thread_compute_1d(struct pthreadpool* threadpool, struct thread_info* thread) {
150 const pthreadpool_function_1d_t function = threadpool->function;
151 void *const argument = threadpool->argument;
152 /* Process thread's own range of items */
153 size_t range_start = thread->range_start;
154 while (atomic_decrement(&thread->range_length)) {
155 function(argument, range_start++);
156 }
157 /* Done, now look for other threads' items to steal */
158 const size_t thread_number = thread->thread_number;
159 const size_t threads_count = threadpool->threads_count;
160 for (size_t tid = (thread_number + 1) % threads_count; tid != thread_number; tid = (tid + 1) % threads_count) {
161 struct thread_info* other_thread = &threadpool->threads[tid];
162 if (other_thread->state != thread_state_idle) {
163 while (atomic_decrement(&other_thread->range_length)) {
164 const size_t item_id = __sync_sub_and_fetch(&other_thread->range_end, 1);
165 function(argument, item_id);
166 }
167 }
168 }
169}
170
171static void* thread_main(void* arg) {
172 struct thread_info* thread = (struct thread_info*) arg;
173 struct pthreadpool* threadpool = ((struct pthreadpool*) (thread - thread->thread_number)) - 1;
174
175 /* Check in */
176 checkin_worker_thread(threadpool);
177
178 /* Monitor the state changes and act accordingly */
179 for (;;) {
180 /* Lock the state mutex */
181 pthread_mutex_lock(&threadpool->state_mutex);
182 /* Read the state */
183 enum thread_state state;
184 while ((state = thread->state) == thread_state_idle) {
185 /* Wait for state change */
186 pthread_cond_wait(&threadpool->state_condvar, &threadpool->state_mutex);
187 }
188 /* Read non-idle state */
189 pthread_mutex_unlock(&threadpool->state_mutex);
190 switch (state) {
191 case thread_state_compute_1d:
192 thread_compute_1d(threadpool, thread);
193 break;
194 case thread_state_shutdown:
195 return NULL;
196 case thread_state_idle:
197 /* To inhibit compiler warning */
198 break;
199 }
200 /* Notify the master thread that we finished processing */
201 thread->state = thread_state_idle;
202 checkin_worker_thread(threadpool);
203 };
204}
205
206struct pthreadpool* pthreadpool_create(size_t threads_count) {
207 if (threads_count == 0) {
208 threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN);
209 }
210 struct pthreadpool* threadpool = memalign(64, sizeof(struct pthreadpool) + threads_count * sizeof(struct thread_info));
211 memset(threadpool, 0, sizeof(struct pthreadpool) + threads_count * sizeof(struct thread_info));
212 threadpool->threads_count = threads_count;
213 pthread_mutex_init(&threadpool->execution_mutex, NULL);
214 pthread_mutex_init(&threadpool->barrier_mutex, NULL);
215 pthread_cond_init(&threadpool->barrier_condvar, NULL);
216 pthread_mutex_init(&threadpool->state_mutex, NULL);
217 pthread_cond_init(&threadpool->state_condvar, NULL);
218
219 for (size_t tid = 0; tid < threads_count; tid++) {
220 threadpool->threads[tid].thread_number = tid;
221 pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]);
222 }
223
224 /* Wait until all threads initialize */
225 wait_worker_threads(threadpool);
226 return threadpool;
227}
228
229uint32_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) {
230 return threadpool->threads_count;
231}
232
233static inline size_t multiply_divide(size_t a, size_t b, size_t d) {
234 #if defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 4)
235 return (size_t) (((uint64_t) a) * ((uint64_t) b)) / ((uint64_t) d);
236 #elif defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 8)
Marat Dukhanc058bd32015-08-23 22:24:48 -0400237 return (size_t) (((__uint128_t) a) * ((__uint128_t) b)) / ((__uint128_t) d);
Marat Dukhan0a312192015-08-22 17:46:29 -0400238 #else
239 #error "Unsupported platform"
240 #endif
241}
242
243void pthreadpool_compute_1d(
244 struct pthreadpool* threadpool,
245 pthreadpool_function_1d_t function,
246 void* argument,
247 size_t items)
248{
249 /* Protect the global threadpool structures */
250 pthread_mutex_lock(&threadpool->execution_mutex);
251
252 /* Spread the work between threads */
253 for (size_t tid = 0; tid < threadpool->threads_count; tid++) {
254 struct thread_info* thread = &threadpool->threads[tid];
255 thread->range_start = multiply_divide(items, tid, threadpool->threads_count);
256 thread->range_end = multiply_divide(items, tid + 1, threadpool->threads_count);
257 thread->range_length = thread->range_end - thread->range_start;
258 thread->state = thread_state_compute_1d;
259 }
260
261 /* Setup global arguments */
262 threadpool->function = function;
263 threadpool->argument = argument;
264
265 /* Wake up the threads */
266 wakeup_worker_threads(threadpool);
267
268 /* Wait until the threads finish computation */
269 wait_worker_threads(threadpool);
270
271 /* Unprotect the global threadpool structures */
272 pthread_mutex_unlock(&threadpool->execution_mutex);
273}
274
275void pthreadpool_destroy(struct pthreadpool* threadpool) {
276 /* Update threads' states */
277 for (size_t tid = 0; tid < threadpool->threads_count; tid++) {
278 threadpool->threads[tid].state = thread_state_shutdown;
279 }
280
281 /* Wake up the threads */
282 wakeup_worker_threads(threadpool);
283
284 /* Wait until all threads return */
285 for (size_t tid = 0; tid < threadpool->threads_count; tid++) {
286 pthread_join(threadpool->threads[tid].thread_object, NULL);
287 }
288
289 /* Release resources */
290 pthread_mutex_destroy(&threadpool->execution_mutex);
291 pthread_mutex_destroy(&threadpool->barrier_mutex);
292 pthread_cond_destroy(&threadpool->barrier_condvar);
293 pthread_mutex_destroy(&threadpool->state_mutex);
294 pthread_cond_destroy(&threadpool->state_condvar);
295 free(threadpool);
296}