blob: 6f0a88fec1f249b3f948f399ed64cec586ae2be2 [file] [log] [blame]
Zach Johnsond18cfd02014-08-04 20:51:06 -07001/******************************************************************************
2 *
3 * Copyright (C) 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
Chris Mantonf8027002015-03-12 09:22:48 -070019#define LOG_TAG "bt_osi_eager_reader"
Zach Johnsond18cfd02014-08-04 20:51:06 -070020
21#include <assert.h>
22#include <errno.h>
23#include <stddef.h>
Etan Cohen3e59b5b2015-03-31 17:15:53 -070024#include <string.h>
Zach Johnsond18cfd02014-08-04 20:51:06 -070025#include <sys/eventfd.h>
Zach Johnsond18cfd02014-08-04 20:51:06 -070026
Sharvil Nanavati0f9b91e2015-03-12 15:42:50 -070027#include "osi/include/allocator.h"
28#include "osi/include/eager_reader.h"
29#include "osi/include/fixed_queue.h"
30#include "osi/include/osi.h"
Sharvil Nanavati44802762014-12-23 23:08:58 -080031#include "osi/include/log.h"
Sharvil Nanavati0f9b91e2015-03-12 15:42:50 -070032#include "osi/include/reactor.h"
33#include "osi/include/thread.h"
Zach Johnsond18cfd02014-08-04 20:51:06 -070034
35#if !defined(EFD_SEMAPHORE)
36# define EFD_SEMAPHORE (1 << 0)
37#endif
38
39typedef struct {
40 size_t length;
41 size_t offset;
42 uint8_t data[];
43} data_buffer_t;
44
45struct eager_reader_t {
46 int bytes_available_fd; // semaphore mode eventfd which counts the number of available bytes
47 int inbound_fd;
48
49 const allocator_t *allocator;
50 size_t buffer_size;
51 fixed_queue_t *buffers;
52 data_buffer_t *current_buffer;
53
54 thread_t *inbound_read_thread;
55 reactor_object_t *inbound_read_object;
56
57 reactor_object_t *outbound_registration;
58 eager_reader_cb outbound_read_ready;
59 void *outbound_context;
60};
61
Zach Johnsonbb170c12014-08-21 21:00:43 -070062static bool has_byte(const eager_reader_t *reader);
Zach Johnsond18cfd02014-08-04 20:51:06 -070063static void inbound_data_waiting(void *context);
64static void internal_outbound_read_ready(void *context);
65
66eager_reader_t *eager_reader_new(
67 int fd_to_read,
68 const allocator_t *allocator,
69 size_t buffer_size,
70 size_t max_buffer_count,
71 const char *thread_name) {
72
73 assert(fd_to_read != INVALID_FD);
74 assert(allocator != NULL);
75 assert(buffer_size > 0);
76 assert(max_buffer_count > 0);
77 assert(thread_name != NULL && *thread_name != '\0');
78
Zach Johnsonee2aa452014-08-26 20:16:03 -070079 eager_reader_t *ret = osi_calloc(sizeof(eager_reader_t));
Zach Johnsond18cfd02014-08-04 20:51:06 -070080 if (!ret) {
Sharvil Nanavati44802762014-12-23 23:08:58 -080081 LOG_ERROR("%s unable to allocate memory for new eager_reader.", __func__);
Zach Johnsond18cfd02014-08-04 20:51:06 -070082 goto error;
83 }
84
85 ret->allocator = allocator;
86 ret->inbound_fd = fd_to_read;
87
88 ret->bytes_available_fd = eventfd(0, EFD_SEMAPHORE);
89 if (ret->bytes_available_fd == INVALID_FD) {
Sharvil Nanavati44802762014-12-23 23:08:58 -080090 LOG_ERROR("%s unable to create output reading semaphore.", __func__);
Zach Johnsond18cfd02014-08-04 20:51:06 -070091 goto error;
92 }
93
94 ret->buffer_size = buffer_size;
95
96 ret->buffers = fixed_queue_new(max_buffer_count);
97 if (!ret->buffers) {
Sharvil Nanavati44802762014-12-23 23:08:58 -080098 LOG_ERROR("%s unable to create buffers queue.", __func__);
Zach Johnsond18cfd02014-08-04 20:51:06 -070099 goto error;
100 }
101
102 ret->inbound_read_thread = thread_new(thread_name);
103 if (!ret->inbound_read_thread) {
Sharvil Nanavati44802762014-12-23 23:08:58 -0800104 LOG_ERROR("%s unable to make reading thread.", __func__);
Zach Johnsond18cfd02014-08-04 20:51:06 -0700105 goto error;
106 }
107
108 ret->inbound_read_object = reactor_register(
109 thread_get_reactor(ret->inbound_read_thread),
110 fd_to_read,
111 ret,
112 inbound_data_waiting,
113 NULL
114 );
115
116 return ret;
117
118error:;
119 eager_reader_free(ret);
120 return NULL;
121}
122
123void eager_reader_free(eager_reader_t *reader) {
124 if (!reader)
125 return;
126
127 eager_reader_unregister(reader);
128
129 // Only unregister from the input if we actually did register
130 if (reader->inbound_read_object)
131 reactor_unregister(reader->inbound_read_object);
132
133 if (reader->bytes_available_fd != INVALID_FD)
134 close(reader->bytes_available_fd);
135
136 // Free the current buffer, because it's not in the queue
137 // and won't be freed below
138 if (reader->current_buffer)
139 reader->allocator->free(reader->current_buffer);
140
141 fixed_queue_free(reader->buffers, reader->allocator->free);
142 thread_free(reader->inbound_read_thread);
Zach Johnsonee2aa452014-08-26 20:16:03 -0700143 osi_free(reader);
Zach Johnsond18cfd02014-08-04 20:51:06 -0700144}
145
146void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context) {
147 assert(reader != NULL);
148 assert(reactor != NULL);
149 assert(read_cb != NULL);
150
151 // Make sure the reader isn't currently registered.
152 eager_reader_unregister(reader);
153
154 reader->outbound_read_ready = read_cb;
155 reader->outbound_context = context;
156 reader->outbound_registration = reactor_register(reactor, reader->bytes_available_fd, reader, internal_outbound_read_ready, NULL);
157}
158
159void eager_reader_unregister(eager_reader_t *reader) {
160 assert(reader != NULL);
161
162 if (reader->outbound_registration) {
163 reactor_unregister(reader->outbound_registration);
164 reader->outbound_registration = NULL;
165 }
166}
167
168// SEE HEADER FOR THREAD SAFETY NOTE
Zach Johnsonbb170c12014-08-21 21:00:43 -0700169size_t eager_reader_read(eager_reader_t *reader, uint8_t *buffer, size_t max_size, bool block) {
Zach Johnsond18cfd02014-08-04 20:51:06 -0700170 assert(reader != NULL);
Zach Johnsonbb170c12014-08-21 21:00:43 -0700171 assert(buffer != NULL);
Zach Johnsond18cfd02014-08-04 20:51:06 -0700172
Zach Johnsonbb170c12014-08-21 21:00:43 -0700173 size_t bytes_read = 0;
Zach Johnsond18cfd02014-08-04 20:51:06 -0700174
Zach Johnsonbb170c12014-08-21 21:00:43 -0700175 while (bytes_read < max_size) {
176 if (!block && !has_byte(reader))
177 return bytes_read;
Zach Johnsond18cfd02014-08-04 20:51:06 -0700178
Zach Johnsonbb170c12014-08-21 21:00:43 -0700179 eventfd_t value;
180 if (eventfd_read(reader->bytes_available_fd, &value) == -1)
Sharvil Nanavati44802762014-12-23 23:08:58 -0800181 LOG_ERROR("%s unable to read semaphore for output data.", __func__);
Zach Johnsond18cfd02014-08-04 20:51:06 -0700182
Zach Johnsonbb170c12014-08-21 21:00:43 -0700183 if (!reader->current_buffer)
184 reader->current_buffer = fixed_queue_dequeue(reader->buffers);
185
186 buffer[bytes_read] = reader->current_buffer->data[reader->current_buffer->offset];
187 reader->current_buffer->offset++;
188 bytes_read++;
189
190 // Prep for next byte
191 if (reader->current_buffer->offset >= reader->current_buffer->length) {
192 reader->allocator->free(reader->current_buffer);
193 reader->current_buffer = NULL;
194 }
Zach Johnsond18cfd02014-08-04 20:51:06 -0700195 }
196
Zach Johnsonbb170c12014-08-21 21:00:43 -0700197 return bytes_read;
Zach Johnsond18cfd02014-08-04 20:51:06 -0700198}
199
Zach Johnsonbb170c12014-08-21 21:00:43 -0700200static bool has_byte(const eager_reader_t *reader) {
Zach Johnsond18cfd02014-08-04 20:51:06 -0700201 assert(reader != NULL);
202
203 fd_set read_fds;
204 FD_ZERO(&read_fds);
205 FD_SET(reader->bytes_available_fd, &read_fds);
206
207 // Immediate timeout
208 struct timeval timeout;
209 timeout.tv_sec = 0;
210 timeout.tv_usec = 0;
211
212 select(reader->bytes_available_fd + 1, &read_fds, NULL, NULL, &timeout);
213 return FD_ISSET(reader->bytes_available_fd, &read_fds);
214}
215
216static void inbound_data_waiting(void *context) {
217 eager_reader_t *reader = (eager_reader_t *)context;
218
219 data_buffer_t *buffer = (data_buffer_t *)reader->allocator->alloc(reader->buffer_size + sizeof(data_buffer_t));
220 if (!buffer) {
Sharvil Nanavati44802762014-12-23 23:08:58 -0800221 LOG_ERROR("%s couldn't aquire memory for inbound data buffer.", __func__);
Zach Johnsond18cfd02014-08-04 20:51:06 -0700222 return;
223 }
224
225 buffer->length = 0;
226 buffer->offset = 0;
227
228 int bytes_read = read(reader->inbound_fd, buffer->data, reader->buffer_size);
229 if (bytes_read > 0) {
230 // Save the data for later
231 buffer->length = bytes_read;
232 fixed_queue_enqueue(reader->buffers, buffer);
233
234 // Tell consumers data is available by incrementing
235 // the semaphore by the number of bytes we just read
236 eventfd_write(reader->bytes_available_fd, bytes_read);
237 } else {
238 if (bytes_read == 0)
Sharvil Nanavati44802762014-12-23 23:08:58 -0800239 LOG_WARN("%s fd said bytes existed, but none were found.", __func__);
Zach Johnsond18cfd02014-08-04 20:51:06 -0700240 else
Sharvil Nanavati44802762014-12-23 23:08:58 -0800241 LOG_WARN("%s unable to read from file descriptor: %s", __func__, strerror(errno));
Zach Johnsond18cfd02014-08-04 20:51:06 -0700242
243 reader->allocator->free(buffer);
244 }
245}
246
247static void internal_outbound_read_ready(void *context) {
248 assert(context != NULL);
249
250 eager_reader_t *reader = (eager_reader_t *)context;
251 reader->outbound_read_ready(reader, reader->outbound_context);
252}