blob: 7f4f0ab2b159a9bd446085b3145770905bd87260 [file] [log] [blame]
Sharvil Nanavatic5856ba2014-06-23 12:25:40 -07001/******************************************************************************
2 *
3 * Copyright (C) 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070019#include <assert.h>
20#include <pthread.h>
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -070021#include <string.h>
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070022
Sharvil Nanavati0f9b91e2015-03-12 15:42:50 -070023#include "osi/include/allocator.h"
24#include "osi/include/fixed_queue.h"
25#include "osi/include/list.h"
26#include "osi/include/osi.h"
27#include "osi/include/semaphore.h"
28#include "osi/include/reactor.h"
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070029
Etan Cohen3e59b5b2015-03-31 17:15:53 -070030typedef struct fixed_queue_t {
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070031 list_t *list;
32 semaphore_t *enqueue_sem;
33 semaphore_t *dequeue_sem;
34 pthread_mutex_t lock;
35 size_t capacity;
Zach Johnsonbd522a42014-08-15 16:44:46 -070036
37 reactor_object_t *dequeue_object;
38 fixed_queue_cb dequeue_ready;
39 void *dequeue_context;
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070040} fixed_queue_t;
41
Zach Johnsonbd522a42014-08-15 16:44:46 -070042static void internal_dequeue_ready(void *context);
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070043
44fixed_queue_t *fixed_queue_new(size_t capacity) {
Zach Johnson384f8a92014-08-25 23:22:24 -070045 fixed_queue_t *ret = osi_calloc(sizeof(fixed_queue_t));
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070046
Zach Johnson384f8a92014-08-25 23:22:24 -070047 pthread_mutex_init(&ret->lock, NULL);
48 ret->capacity = capacity;
49
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070050 ret->list = list_new(NULL);
51 if (!ret->list)
52 goto error;
53
54 ret->enqueue_sem = semaphore_new(capacity);
55 if (!ret->enqueue_sem)
56 goto error;
57
58 ret->dequeue_sem = semaphore_new(0);
59 if (!ret->dequeue_sem)
60 goto error;
61
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070062 return ret;
63
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -070064error:
Zach Johnson384f8a92014-08-25 23:22:24 -070065 fixed_queue_free(ret, NULL);
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070066 return NULL;
67}
68
69void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
70 if (!queue)
71 return;
72
Zach Johnsonbd522a42014-08-15 16:44:46 -070073 fixed_queue_unregister_dequeue(queue);
74
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070075 if (free_cb)
76 for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node))
77 free_cb(list_node(node));
78
79 list_free(queue->list);
80 semaphore_free(queue->enqueue_sem);
81 semaphore_free(queue->dequeue_sem);
82 pthread_mutex_destroy(&queue->lock);
Zach Johnson384f8a92014-08-25 23:22:24 -070083 osi_free(queue);
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070084}
85
Pavlin Radoslavov3335cf42016-08-19 15:32:30 -070086void fixed_queue_flush(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
87 if (!queue)
88 return;
89
90 while (!fixed_queue_is_empty(queue)) {
91 void *data = fixed_queue_try_dequeue(queue);
92 if (free_cb != NULL) {
93 free_cb(data);
94 }
95 }
96}
97
Zach Johnson93a1c802014-07-30 13:40:09 -070098bool fixed_queue_is_empty(fixed_queue_t *queue) {
Pavlin Radoslavov577862e2015-10-07 18:07:48 -070099 if (queue == NULL)
100 return true;
Zach Johnson93a1c802014-07-30 13:40:09 -0700101
102 pthread_mutex_lock(&queue->lock);
103 bool is_empty = list_is_empty(queue->list);
104 pthread_mutex_unlock(&queue->lock);
105
106 return is_empty;
107}
108
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700109size_t fixed_queue_length(fixed_queue_t *queue) {
Pavlin Radoslavov577862e2015-10-07 18:07:48 -0700110 if (queue == NULL)
111 return 0;
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700112
113 pthread_mutex_lock(&queue->lock);
114 size_t length = list_length(queue->list);
115 pthread_mutex_unlock(&queue->lock);
116
117 return length;
118}
119
Chris Mantonc446cbe2014-08-05 11:07:23 -0700120size_t fixed_queue_capacity(fixed_queue_t *queue) {
121 assert(queue != NULL);
122
123 return queue->capacity;
Sharvil Nanavatic11b4072014-05-02 23:55:09 -0700124}
125
126void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
127 assert(queue != NULL);
128 assert(data != NULL);
129
130 semaphore_wait(queue->enqueue_sem);
131
132 pthread_mutex_lock(&queue->lock);
133 list_append(queue->list, data);
134 pthread_mutex_unlock(&queue->lock);
135
136 semaphore_post(queue->dequeue_sem);
137}
138
139void *fixed_queue_dequeue(fixed_queue_t *queue) {
140 assert(queue != NULL);
141
Sharvil Nanavatic11b4072014-05-02 23:55:09 -0700142 semaphore_wait(queue->dequeue_sem);
143
144 pthread_mutex_lock(&queue->lock);
Sharvil Nanavatic5856ba2014-06-23 12:25:40 -0700145 void *ret = list_front(queue->list);
146 list_remove(queue->list, ret);
147 pthread_mutex_unlock(&queue->lock);
148
149 semaphore_post(queue->enqueue_sem);
150
151 return ret;
152}
153
154bool fixed_queue_try_enqueue(fixed_queue_t *queue, void *data) {
155 assert(queue != NULL);
156 assert(data != NULL);
157
158 if (!semaphore_try_wait(queue->enqueue_sem))
159 return false;
160
161 pthread_mutex_lock(&queue->lock);
162 list_append(queue->list, data);
163 pthread_mutex_unlock(&queue->lock);
164
165 semaphore_post(queue->dequeue_sem);
166 return true;
167}
168
169void *fixed_queue_try_dequeue(fixed_queue_t *queue) {
Pavlin Radoslavov577862e2015-10-07 18:07:48 -0700170 if (queue == NULL)
171 return NULL;
Sharvil Nanavatic5856ba2014-06-23 12:25:40 -0700172
173 if (!semaphore_try_wait(queue->dequeue_sem))
174 return NULL;
175
176 pthread_mutex_lock(&queue->lock);
177 void *ret = list_front(queue->list);
Sharvil Nanavatic11b4072014-05-02 23:55:09 -0700178 list_remove(queue->list, ret);
179 pthread_mutex_unlock(&queue->lock);
180
181 semaphore_post(queue->enqueue_sem);
182
183 return ret;
184}
Sharvil Nanavatiab606b52014-07-04 16:33:37 -0700185
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700186void *fixed_queue_try_peek_first(fixed_queue_t *queue) {
Pavlin Radoslavov577862e2015-10-07 18:07:48 -0700187 if (queue == NULL)
188 return NULL;
Zach Johnsonbd522a42014-08-15 16:44:46 -0700189
190 pthread_mutex_lock(&queue->lock);
Zach Johnsonbd522a42014-08-15 16:44:46 -0700191 void *ret = list_is_empty(queue->list) ? NULL : list_front(queue->list);
192 pthread_mutex_unlock(&queue->lock);
193
194 return ret;
195}
196
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700197void *fixed_queue_try_peek_last(fixed_queue_t *queue) {
Pavlin Radoslavov577862e2015-10-07 18:07:48 -0700198 if (queue == NULL)
199 return NULL;
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700200
201 pthread_mutex_lock(&queue->lock);
202 void *ret = list_is_empty(queue->list) ? NULL : list_back(queue->list);
203 pthread_mutex_unlock(&queue->lock);
204
205 return ret;
206}
207
208void *fixed_queue_try_remove_from_queue(fixed_queue_t *queue, void *data) {
Pavlin Radoslavov577862e2015-10-07 18:07:48 -0700209 if (queue == NULL)
210 return NULL;
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700211
Pavlin Radoslavov153bdfb2015-11-13 18:36:56 -0800212 bool removed = false;
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700213 pthread_mutex_lock(&queue->lock);
Pavlin Radoslavov153bdfb2015-11-13 18:36:56 -0800214 if (list_contains(queue->list, data) &&
215 semaphore_try_wait(queue->dequeue_sem)) {
216 removed = list_remove(queue->list, data);
217 assert(removed);
218 }
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700219 pthread_mutex_unlock(&queue->lock);
220
Pavlin Radoslavov153bdfb2015-11-13 18:36:56 -0800221 if (removed) {
222 semaphore_post(queue->enqueue_sem);
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700223 return data;
Pavlin Radoslavov153bdfb2015-11-13 18:36:56 -0800224 }
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700225 return NULL;
226}
227
228list_t *fixed_queue_get_list(fixed_queue_t *queue) {
229 assert(queue != NULL);
230
231 // NOTE: This function is not thread safe, and there is no point for
Pavlin Radoslavov577862e2015-10-07 18:07:48 -0700232 // calling pthread_mutex_lock() / pthread_mutex_unlock()
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700233 return queue->list;
234}
235
236
Sharvil Nanavatiab606b52014-07-04 16:33:37 -0700237int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue) {
238 assert(queue != NULL);
239 return semaphore_get_fd(queue->dequeue_sem);
240}
241
242int fixed_queue_get_enqueue_fd(const fixed_queue_t *queue) {
243 assert(queue != NULL);
244 return semaphore_get_fd(queue->enqueue_sem);
245}
Zach Johnsonbd522a42014-08-15 16:44:46 -0700246
247void fixed_queue_register_dequeue(fixed_queue_t *queue, reactor_t *reactor, fixed_queue_cb ready_cb, void *context) {
248 assert(queue != NULL);
249 assert(reactor != NULL);
250 assert(ready_cb != NULL);
251
252 // Make sure we're not already registered
253 fixed_queue_unregister_dequeue(queue);
254
255 queue->dequeue_ready = ready_cb;
256 queue->dequeue_context = context;
257 queue->dequeue_object = reactor_register(
258 reactor,
259 fixed_queue_get_dequeue_fd(queue),
260 queue,
261 internal_dequeue_ready,
262 NULL
263 );
264}
265
266void fixed_queue_unregister_dequeue(fixed_queue_t *queue) {
267 assert(queue != NULL);
268
269 if (queue->dequeue_object) {
270 reactor_unregister(queue->dequeue_object);
271 queue->dequeue_object = NULL;
272 }
273}
274
275static void internal_dequeue_ready(void *context) {
276 assert(context != NULL);
277
278 fixed_queue_t *queue = context;
279 queue->dequeue_ready(queue, queue->dequeue_context);
280}