blob: 2d643678dd4eb7392c7fbedc49a19a1164880819 [file] [log] [blame]
Sharvil Nanavatic5856ba2014-06-23 12:25:40 -07001/******************************************************************************
2 *
3 * Copyright (C) 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
Sharvil Nanavatic11b4072014-05-02 23:55:09 -070019#include <assert.h>
20#include <pthread.h>
21#include <stdlib.h>
22
23#include "fixed_queue.h"
24#include "list.h"
25#include "osi.h"
26#include "semaphore.h"
27
28typedef struct fixed_queue_t {
29 list_t *list;
30 semaphore_t *enqueue_sem;
31 semaphore_t *dequeue_sem;
32 pthread_mutex_t lock;
33 size_t capacity;
34} fixed_queue_t;
35
36fixed_queue_t *fixed_queue_new(size_t capacity) {
37 fixed_queue_t *ret = calloc(1, sizeof(fixed_queue_t));
38 if (!ret)
39 goto error;
40
41 ret->list = list_new(NULL);
42 if (!ret->list)
43 goto error;
44
45 ret->enqueue_sem = semaphore_new(capacity);
46 if (!ret->enqueue_sem)
47 goto error;
48
49 ret->dequeue_sem = semaphore_new(0);
50 if (!ret->dequeue_sem)
51 goto error;
52
53 pthread_mutex_init(&ret->lock, NULL);
54 ret->capacity = capacity;
55
56 return ret;
57
58error:;
59 if (ret) {
60 list_free(ret->list);
61 semaphore_free(ret->enqueue_sem);
62 semaphore_free(ret->dequeue_sem);
63 }
64
65 free(ret);
66 return NULL;
67}
68
69void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
70 if (!queue)
71 return;
72
73 if (free_cb)
74 for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node))
75 free_cb(list_node(node));
76
77 list_free(queue->list);
78 semaphore_free(queue->enqueue_sem);
79 semaphore_free(queue->dequeue_sem);
80 pthread_mutex_destroy(&queue->lock);
81 free(queue);
82}
83
84void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
85 assert(queue != NULL);
86 assert(data != NULL);
87
88 semaphore_wait(queue->enqueue_sem);
89
90 pthread_mutex_lock(&queue->lock);
91 list_append(queue->list, data);
92 pthread_mutex_unlock(&queue->lock);
93
94 semaphore_post(queue->dequeue_sem);
95}
96
97void *fixed_queue_dequeue(fixed_queue_t *queue) {
98 assert(queue != NULL);
99
Sharvil Nanavatic11b4072014-05-02 23:55:09 -0700100 semaphore_wait(queue->dequeue_sem);
101
102 pthread_mutex_lock(&queue->lock);
Sharvil Nanavatic5856ba2014-06-23 12:25:40 -0700103 void *ret = list_front(queue->list);
104 list_remove(queue->list, ret);
105 pthread_mutex_unlock(&queue->lock);
106
107 semaphore_post(queue->enqueue_sem);
108
109 return ret;
110}
111
112bool fixed_queue_try_enqueue(fixed_queue_t *queue, void *data) {
113 assert(queue != NULL);
114 assert(data != NULL);
115
116 if (!semaphore_try_wait(queue->enqueue_sem))
117 return false;
118
119 pthread_mutex_lock(&queue->lock);
120 list_append(queue->list, data);
121 pthread_mutex_unlock(&queue->lock);
122
123 semaphore_post(queue->dequeue_sem);
124 return true;
125}
126
127void *fixed_queue_try_dequeue(fixed_queue_t *queue) {
128 assert(queue != NULL);
129
130 if (!semaphore_try_wait(queue->dequeue_sem))
131 return NULL;
132
133 pthread_mutex_lock(&queue->lock);
134 void *ret = list_front(queue->list);
Sharvil Nanavatic11b4072014-05-02 23:55:09 -0700135 list_remove(queue->list, ret);
136 pthread_mutex_unlock(&queue->lock);
137
138 semaphore_post(queue->enqueue_sem);
139
140 return ret;
141}
Sharvil Nanavatiab606b52014-07-04 16:33:37 -0700142
143int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue) {
144 assert(queue != NULL);
145 return semaphore_get_fd(queue->dequeue_sem);
146}
147
148int fixed_queue_get_enqueue_fd(const fixed_queue_t *queue) {
149 assert(queue != NULL);
150 return semaphore_get_fd(queue->enqueue_sem);
151}