blob: 81bd66da9ef789aa0d0bd78290bfb3cb574a56a4 [file] [log] [blame]
Sharvil Nanavatic5856ba2014-06-23 12:25:40 -07001/******************************************************************************
2 *
Jakub Pawlowski5b790fe2017-09-18 09:00:20 -07003 * Copyright 2014 Google, Inc.
Sharvil Nanavatic5856ba2014-06-23 12:25:40 -07004 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
Jack Hef2af1c42016-12-13 01:59:12 -080019#include <base/logging.h>
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -070020#include <string.h>
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070021
Marie Janssen21da6372016-11-02 18:31:55 -070022#include <mutex>
23
Sharvil Nanavati0f9b91e2015-03-12 15:42:50 -070024#include "osi/include/allocator.h"
25#include "osi/include/fixed_queue.h"
26#include "osi/include/list.h"
27#include "osi/include/osi.h"
Sharvil Nanavati0f9b91e2015-03-12 15:42:50 -070028#include "osi/include/reactor.h"
Myles Watsonb55040c2016-10-19 13:15:34 -070029#include "osi/include/semaphore.h"
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070030
Etan Cohen3e59b5b2015-03-31 17:15:53 -070031typedef struct fixed_queue_t {
Myles Watsonb55040c2016-10-19 13:15:34 -070032 list_t* list;
33 semaphore_t* enqueue_sem;
34 semaphore_t* dequeue_sem;
Marie Janssen21da6372016-11-02 18:31:55 -070035 std::mutex* mutex;
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070036 size_t capacity;
Zach Johnsonbd522a42014-08-15 16:44:46 -070037
Myles Watsonb55040c2016-10-19 13:15:34 -070038 reactor_object_t* dequeue_object;
Zach Johnsonbd522a42014-08-15 16:44:46 -070039 fixed_queue_cb dequeue_ready;
Myles Watsonb55040c2016-10-19 13:15:34 -070040 void* dequeue_context;
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070041} fixed_queue_t;
42
Myles Watsonb55040c2016-10-19 13:15:34 -070043static void internal_dequeue_ready(void* context);
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070044
Myles Watsonb55040c2016-10-19 13:15:34 -070045fixed_queue_t* fixed_queue_new(size_t capacity) {
46 fixed_queue_t* ret =
47 static_cast<fixed_queue_t*>(osi_calloc(sizeof(fixed_queue_t)));
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070048
Marie Janssen21da6372016-11-02 18:31:55 -070049 ret->mutex = new std::mutex;
Zach Johnson384f8a92014-08-25 23:22:24 -070050 ret->capacity = capacity;
51
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070052 ret->list = list_new(NULL);
Myles Watsonb55040c2016-10-19 13:15:34 -070053 if (!ret->list) goto error;
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070054
55 ret->enqueue_sem = semaphore_new(capacity);
Myles Watsonb55040c2016-10-19 13:15:34 -070056 if (!ret->enqueue_sem) goto error;
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070057
58 ret->dequeue_sem = semaphore_new(0);
Myles Watsonb55040c2016-10-19 13:15:34 -070059 if (!ret->dequeue_sem) goto error;
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070060
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070061 return ret;
62
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -070063error:
Zach Johnson384f8a92014-08-25 23:22:24 -070064 fixed_queue_free(ret, NULL);
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070065 return NULL;
66}
67
Myles Watsonb55040c2016-10-19 13:15:34 -070068void fixed_queue_free(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
69 if (!queue) return;
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070070
Zach Johnsonbd522a42014-08-15 16:44:46 -070071 fixed_queue_unregister_dequeue(queue);
72
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070073 if (free_cb)
Myles Watsonb55040c2016-10-19 13:15:34 -070074 for (const list_node_t* node = list_begin(queue->list);
75 node != list_end(queue->list); node = list_next(node))
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070076 free_cb(list_node(node));
77
78 list_free(queue->list);
79 semaphore_free(queue->enqueue_sem);
80 semaphore_free(queue->dequeue_sem);
Marie Janssen21da6372016-11-02 18:31:55 -070081 delete queue->mutex;
Zach Johnson384f8a92014-08-25 23:22:24 -070082 osi_free(queue);
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070083}
84
Myles Watsonb55040c2016-10-19 13:15:34 -070085void fixed_queue_flush(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
86 if (!queue) return;
Pavlin Radoslavov3335cf42016-08-19 15:32:30 -070087
88 while (!fixed_queue_is_empty(queue)) {
Myles Watsonb55040c2016-10-19 13:15:34 -070089 void* data = fixed_queue_try_dequeue(queue);
Pavlin Radoslavov3335cf42016-08-19 15:32:30 -070090 if (free_cb != NULL) {
91 free_cb(data);
92 }
93 }
94}
95
Myles Watsonb55040c2016-10-19 13:15:34 -070096bool fixed_queue_is_empty(fixed_queue_t* queue) {
97 if (queue == NULL) return true;
Zach Johnson93a1c802014-07-30 13:40:09 -070098
Marie Janssen21da6372016-11-02 18:31:55 -070099 std::lock_guard<std::mutex> lock(*queue->mutex);
100 return list_is_empty(queue->list);
Zach Johnson93a1c802014-07-30 13:40:09 -0700101}
102
Myles Watsonb55040c2016-10-19 13:15:34 -0700103size_t fixed_queue_length(fixed_queue_t* queue) {
104 if (queue == NULL) return 0;
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700105
Marie Janssen21da6372016-11-02 18:31:55 -0700106 std::lock_guard<std::mutex> lock(*queue->mutex);
107 return list_length(queue->list);
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700108}
109
Myles Watsonb55040c2016-10-19 13:15:34 -0700110size_t fixed_queue_capacity(fixed_queue_t* queue) {
Jack Hef2af1c42016-12-13 01:59:12 -0800111 CHECK(queue != NULL);
Chris Mantonc446cbe2014-08-05 11:07:23 -0700112
113 return queue->capacity;
Sharvil Nanavatic11b4072014-05-02 23:55:09 -0700114}
115
Myles Watsonb55040c2016-10-19 13:15:34 -0700116void fixed_queue_enqueue(fixed_queue_t* queue, void* data) {
Jack Hef2af1c42016-12-13 01:59:12 -0800117 CHECK(queue != NULL);
118 CHECK(data != NULL);
Sharvil Nanavatic11b4072014-05-02 23:55:09 -0700119
120 semaphore_wait(queue->enqueue_sem);
121
Marie Janssen21da6372016-11-02 18:31:55 -0700122 {
123 std::lock_guard<std::mutex> lock(*queue->mutex);
124 list_append(queue->list, data);
125 }
Sharvil Nanavatic11b4072014-05-02 23:55:09 -0700126
127 semaphore_post(queue->dequeue_sem);
128}
129
Myles Watsonb55040c2016-10-19 13:15:34 -0700130void* fixed_queue_dequeue(fixed_queue_t* queue) {
Jack Hef2af1c42016-12-13 01:59:12 -0800131 CHECK(queue != NULL);
Sharvil Nanavatic11b4072014-05-02 23:55:09 -0700132
Sharvil Nanavatic11b4072014-05-02 23:55:09 -0700133 semaphore_wait(queue->dequeue_sem);
134
Marie Janssen21da6372016-11-02 18:31:55 -0700135 void* ret = NULL;
136 {
137 std::lock_guard<std::mutex> lock(*queue->mutex);
138 ret = list_front(queue->list);
139 list_remove(queue->list, ret);
140 }
Sharvil Nanavatic5856ba2014-06-23 12:25:40 -0700141
142 semaphore_post(queue->enqueue_sem);
143
144 return ret;
145}
146
Myles Watsonb55040c2016-10-19 13:15:34 -0700147bool fixed_queue_try_enqueue(fixed_queue_t* queue, void* data) {
Jack Hef2af1c42016-12-13 01:59:12 -0800148 CHECK(queue != NULL);
149 CHECK(data != NULL);
Sharvil Nanavatic5856ba2014-06-23 12:25:40 -0700150
Myles Watsonb55040c2016-10-19 13:15:34 -0700151 if (!semaphore_try_wait(queue->enqueue_sem)) return false;
Sharvil Nanavatic5856ba2014-06-23 12:25:40 -0700152
Marie Janssen21da6372016-11-02 18:31:55 -0700153 {
154 std::lock_guard<std::mutex> lock(*queue->mutex);
155 list_append(queue->list, data);
156 }
Sharvil Nanavatic5856ba2014-06-23 12:25:40 -0700157
158 semaphore_post(queue->dequeue_sem);
159 return true;
160}
161
Myles Watsonb55040c2016-10-19 13:15:34 -0700162void* fixed_queue_try_dequeue(fixed_queue_t* queue) {
163 if (queue == NULL) return NULL;
Sharvil Nanavatic5856ba2014-06-23 12:25:40 -0700164
Myles Watsonb55040c2016-10-19 13:15:34 -0700165 if (!semaphore_try_wait(queue->dequeue_sem)) return NULL;
Sharvil Nanavatic5856ba2014-06-23 12:25:40 -0700166
Marie Janssen21da6372016-11-02 18:31:55 -0700167 void* ret = NULL;
168 {
169 std::lock_guard<std::mutex> lock(*queue->mutex);
170 ret = list_front(queue->list);
171 list_remove(queue->list, ret);
172 }
Sharvil Nanavatic11b4072014-05-02 23:55:09 -0700173
174 semaphore_post(queue->enqueue_sem);
175
176 return ret;
177}
Sharvil Nanavatiab606b52014-07-04 16:33:37 -0700178
Myles Watsonb55040c2016-10-19 13:15:34 -0700179void* fixed_queue_try_peek_first(fixed_queue_t* queue) {
180 if (queue == NULL) return NULL;
Zach Johnsonbd522a42014-08-15 16:44:46 -0700181
Marie Janssen21da6372016-11-02 18:31:55 -0700182 std::lock_guard<std::mutex> lock(*queue->mutex);
183 return list_is_empty(queue->list) ? NULL : list_front(queue->list);
Zach Johnsonbd522a42014-08-15 16:44:46 -0700184}
185
Myles Watsonb55040c2016-10-19 13:15:34 -0700186void* fixed_queue_try_peek_last(fixed_queue_t* queue) {
187 if (queue == NULL) return NULL;
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700188
Marie Janssen21da6372016-11-02 18:31:55 -0700189 std::lock_guard<std::mutex> lock(*queue->mutex);
190 return list_is_empty(queue->list) ? NULL : list_back(queue->list);
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700191}
192
Myles Watsonb55040c2016-10-19 13:15:34 -0700193void* fixed_queue_try_remove_from_queue(fixed_queue_t* queue, void* data) {
194 if (queue == NULL) return NULL;
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700195
Pavlin Radoslavov153bdfb2015-11-13 18:36:56 -0800196 bool removed = false;
Marie Janssen21da6372016-11-02 18:31:55 -0700197 {
198 std::lock_guard<std::mutex> lock(*queue->mutex);
199 if (list_contains(queue->list, data) &&
200 semaphore_try_wait(queue->dequeue_sem)) {
201 removed = list_remove(queue->list, data);
Jack Hef2af1c42016-12-13 01:59:12 -0800202 CHECK(removed);
Marie Janssen21da6372016-11-02 18:31:55 -0700203 }
Pavlin Radoslavov153bdfb2015-11-13 18:36:56 -0800204 }
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700205
Pavlin Radoslavov153bdfb2015-11-13 18:36:56 -0800206 if (removed) {
207 semaphore_post(queue->enqueue_sem);
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700208 return data;
Pavlin Radoslavov153bdfb2015-11-13 18:36:56 -0800209 }
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700210 return NULL;
211}
212
Myles Watsonb55040c2016-10-19 13:15:34 -0700213list_t* fixed_queue_get_list(fixed_queue_t* queue) {
Jack Hef2af1c42016-12-13 01:59:12 -0800214 CHECK(queue != NULL);
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700215
Marie Janssen21da6372016-11-02 18:31:55 -0700216 // NOTE: Using the list in this way is not thread-safe.
217 // Using this list in any context where threads can call other functions
218 // to the queue can break our assumptions and the queue in general.
Pavlin Radoslavov1a3844f2015-09-25 11:21:15 -0700219 return queue->list;
220}
221
Myles Watsonb55040c2016-10-19 13:15:34 -0700222int fixed_queue_get_dequeue_fd(const fixed_queue_t* queue) {
Jack Hef2af1c42016-12-13 01:59:12 -0800223 CHECK(queue != NULL);
Sharvil Nanavatiab606b52014-07-04 16:33:37 -0700224 return semaphore_get_fd(queue->dequeue_sem);
225}
226
Myles Watsonb55040c2016-10-19 13:15:34 -0700227int fixed_queue_get_enqueue_fd(const fixed_queue_t* queue) {
Jack Hef2af1c42016-12-13 01:59:12 -0800228 CHECK(queue != NULL);
Sharvil Nanavatiab606b52014-07-04 16:33:37 -0700229 return semaphore_get_fd(queue->enqueue_sem);
230}
Zach Johnsonbd522a42014-08-15 16:44:46 -0700231
Myles Watsonb55040c2016-10-19 13:15:34 -0700232void fixed_queue_register_dequeue(fixed_queue_t* queue, reactor_t* reactor,
233 fixed_queue_cb ready_cb, void* context) {
Jack Hef2af1c42016-12-13 01:59:12 -0800234 CHECK(queue != NULL);
235 CHECK(reactor != NULL);
236 CHECK(ready_cb != NULL);
Zach Johnsonbd522a42014-08-15 16:44:46 -0700237
238 // Make sure we're not already registered
239 fixed_queue_unregister_dequeue(queue);
240
241 queue->dequeue_ready = ready_cb;
242 queue->dequeue_context = context;
Myles Watsonb55040c2016-10-19 13:15:34 -0700243 queue->dequeue_object =
244 reactor_register(reactor, fixed_queue_get_dequeue_fd(queue), queue,
245 internal_dequeue_ready, NULL);
Zach Johnsonbd522a42014-08-15 16:44:46 -0700246}
247
Myles Watsonb55040c2016-10-19 13:15:34 -0700248void fixed_queue_unregister_dequeue(fixed_queue_t* queue) {
Jack Hef2af1c42016-12-13 01:59:12 -0800249 CHECK(queue != NULL);
Zach Johnsonbd522a42014-08-15 16:44:46 -0700250
251 if (queue->dequeue_object) {
252 reactor_unregister(queue->dequeue_object);
253 queue->dequeue_object = NULL;
254 }
255}
256
Myles Watsonb55040c2016-10-19 13:15:34 -0700257static void internal_dequeue_ready(void* context) {
Jack Hef2af1c42016-12-13 01:59:12 -0800258 CHECK(context != NULL);
Zach Johnsonbd522a42014-08-15 16:44:46 -0700259
Myles Watsonb55040c2016-10-19 13:15:34 -0700260 fixed_queue_t* queue = static_cast<fixed_queue_t*>(context);
Zach Johnsonbd522a42014-08-15 16:44:46 -0700261 queue->dequeue_ready(queue, queue->dequeue_context);
262}