blob: 07a944cad3cf8d48205231246c14d591d6586729 [file] [log] [blame]
Zach Johnsond18cfd02014-08-04 20:51:06 -07001/******************************************************************************
2 *
3 * Copyright (C) 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
Chris Mantonf8027002015-03-12 09:22:48 -070019#define LOG_TAG "bt_osi_eager_reader"
Zach Johnsond18cfd02014-08-04 20:51:06 -070020
Marie Janssen49a86702015-07-08 11:48:57 -070021#include "osi/include/eager_reader.h"
22
Zach Johnsond18cfd02014-08-04 20:51:06 -070023#include <assert.h>
24#include <errno.h>
Etan Cohen3e59b5b2015-03-31 17:15:53 -070025#include <string.h>
Zach Johnsond18cfd02014-08-04 20:51:06 -070026#include <sys/eventfd.h>
Marie Janssen49a86702015-07-08 11:48:57 -070027#include <unistd.h>
Zach Johnsond18cfd02014-08-04 20:51:06 -070028
Sharvil Nanavati0f9b91e2015-03-12 15:42:50 -070029#include "osi/include/fixed_queue.h"
Sharvil Nanavati44802762014-12-23 23:08:58 -080030#include "osi/include/log.h"
Marie Janssen49a86702015-07-08 11:48:57 -070031#include "osi/include/osi.h"
Sharvil Nanavati0f9b91e2015-03-12 15:42:50 -070032#include "osi/include/reactor.h"
Zach Johnsond18cfd02014-08-04 20:51:06 -070033
34#if !defined(EFD_SEMAPHORE)
35# define EFD_SEMAPHORE (1 << 0)
36#endif
37
38typedef struct {
39 size_t length;
40 size_t offset;
41 uint8_t data[];
42} data_buffer_t;
43
44struct eager_reader_t {
45 int bytes_available_fd; // semaphore mode eventfd which counts the number of available bytes
46 int inbound_fd;
47
48 const allocator_t *allocator;
49 size_t buffer_size;
50 fixed_queue_t *buffers;
51 data_buffer_t *current_buffer;
52
53 thread_t *inbound_read_thread;
54 reactor_object_t *inbound_read_object;
55
56 reactor_object_t *outbound_registration;
57 eager_reader_cb outbound_read_ready;
58 void *outbound_context;
59};
60
Zach Johnsonbb170c12014-08-21 21:00:43 -070061static bool has_byte(const eager_reader_t *reader);
Zach Johnsond18cfd02014-08-04 20:51:06 -070062static void inbound_data_waiting(void *context);
63static void internal_outbound_read_ready(void *context);
64
65eager_reader_t *eager_reader_new(
66 int fd_to_read,
67 const allocator_t *allocator,
68 size_t buffer_size,
69 size_t max_buffer_count,
70 const char *thread_name) {
71
72 assert(fd_to_read != INVALID_FD);
73 assert(allocator != NULL);
74 assert(buffer_size > 0);
75 assert(max_buffer_count > 0);
76 assert(thread_name != NULL && *thread_name != '\0');
77
Zach Johnsonee2aa452014-08-26 20:16:03 -070078 eager_reader_t *ret = osi_calloc(sizeof(eager_reader_t));
Zach Johnsond18cfd02014-08-04 20:51:06 -070079
80 ret->allocator = allocator;
81 ret->inbound_fd = fd_to_read;
82
Sharvil Nanavati08019f32015-06-13 02:12:08 -070083 ret->bytes_available_fd = eventfd(0, 0);
Zach Johnsond18cfd02014-08-04 20:51:06 -070084 if (ret->bytes_available_fd == INVALID_FD) {
Marie Janssendb554582015-06-26 14:53:46 -070085 LOG_ERROR(LOG_TAG, "%s unable to create output reading semaphore.", __func__);
Zach Johnsond18cfd02014-08-04 20:51:06 -070086 goto error;
87 }
88
89 ret->buffer_size = buffer_size;
90
91 ret->buffers = fixed_queue_new(max_buffer_count);
92 if (!ret->buffers) {
Marie Janssendb554582015-06-26 14:53:46 -070093 LOG_ERROR(LOG_TAG, "%s unable to create buffers queue.", __func__);
Zach Johnsond18cfd02014-08-04 20:51:06 -070094 goto error;
95 }
96
97 ret->inbound_read_thread = thread_new(thread_name);
98 if (!ret->inbound_read_thread) {
Marie Janssendb554582015-06-26 14:53:46 -070099 LOG_ERROR(LOG_TAG, "%s unable to make reading thread.", __func__);
Zach Johnsond18cfd02014-08-04 20:51:06 -0700100 goto error;
101 }
102
103 ret->inbound_read_object = reactor_register(
104 thread_get_reactor(ret->inbound_read_thread),
105 fd_to_read,
106 ret,
107 inbound_data_waiting,
108 NULL
109 );
110
111 return ret;
112
113error:;
114 eager_reader_free(ret);
115 return NULL;
116}
117
118void eager_reader_free(eager_reader_t *reader) {
119 if (!reader)
120 return;
121
122 eager_reader_unregister(reader);
123
124 // Only unregister from the input if we actually did register
125 if (reader->inbound_read_object)
126 reactor_unregister(reader->inbound_read_object);
127
128 if (reader->bytes_available_fd != INVALID_FD)
129 close(reader->bytes_available_fd);
130
131 // Free the current buffer, because it's not in the queue
132 // and won't be freed below
133 if (reader->current_buffer)
134 reader->allocator->free(reader->current_buffer);
135
136 fixed_queue_free(reader->buffers, reader->allocator->free);
137 thread_free(reader->inbound_read_thread);
Zach Johnsonee2aa452014-08-26 20:16:03 -0700138 osi_free(reader);
Zach Johnsond18cfd02014-08-04 20:51:06 -0700139}
140
141void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context) {
142 assert(reader != NULL);
143 assert(reactor != NULL);
144 assert(read_cb != NULL);
145
146 // Make sure the reader isn't currently registered.
147 eager_reader_unregister(reader);
148
149 reader->outbound_read_ready = read_cb;
150 reader->outbound_context = context;
151 reader->outbound_registration = reactor_register(reactor, reader->bytes_available_fd, reader, internal_outbound_read_ready, NULL);
152}
153
154void eager_reader_unregister(eager_reader_t *reader) {
155 assert(reader != NULL);
156
157 if (reader->outbound_registration) {
158 reactor_unregister(reader->outbound_registration);
159 reader->outbound_registration = NULL;
160 }
161}
162
163// SEE HEADER FOR THREAD SAFETY NOTE
Andre Eisenbachb9757ee2015-11-20 14:07:24 -0800164size_t eager_reader_read(eager_reader_t *reader, uint8_t *buffer, size_t max_size) {
Zach Johnsond18cfd02014-08-04 20:51:06 -0700165 assert(reader != NULL);
Zach Johnsonbb170c12014-08-21 21:00:43 -0700166 assert(buffer != NULL);
Zach Johnsond18cfd02014-08-04 20:51:06 -0700167
Andre Eisenbachb9757ee2015-11-20 14:07:24 -0800168 // Poll to see if we have any bytes available before reading.
169 if (!has_byte(reader))
Sharvil Nanavati08019f32015-06-13 02:12:08 -0700170 return 0;
Zach Johnsond18cfd02014-08-04 20:51:06 -0700171
Sharvil Nanavati08019f32015-06-13 02:12:08 -0700172 // Find out how many bytes we have available in our various buffers.
173 eventfd_t bytes_available;
174 if (eventfd_read(reader->bytes_available_fd, &bytes_available) == -1) {
Marie Janssendb554582015-06-26 14:53:46 -0700175 LOG_ERROR(LOG_TAG, "%s unable to read semaphore for output data.", __func__);
Sharvil Nanavati08019f32015-06-13 02:12:08 -0700176 return 0;
177 }
Zach Johnsond18cfd02014-08-04 20:51:06 -0700178
Sharvil Nanavati08019f32015-06-13 02:12:08 -0700179 if (max_size > bytes_available)
180 max_size = bytes_available;
Zach Johnsond18cfd02014-08-04 20:51:06 -0700181
Sharvil Nanavati08019f32015-06-13 02:12:08 -0700182 size_t bytes_consumed = 0;
183 while (bytes_consumed < max_size) {
Zach Johnsonbb170c12014-08-21 21:00:43 -0700184 if (!reader->current_buffer)
185 reader->current_buffer = fixed_queue_dequeue(reader->buffers);
186
Sharvil Nanavati08019f32015-06-13 02:12:08 -0700187 size_t bytes_to_copy = reader->current_buffer->length - reader->current_buffer->offset;
188 if (bytes_to_copy > (max_size - bytes_consumed))
189 bytes_to_copy = max_size - bytes_consumed;
Zach Johnsonbb170c12014-08-21 21:00:43 -0700190
Sharvil Nanavati08019f32015-06-13 02:12:08 -0700191 memcpy(&buffer[bytes_consumed], &reader->current_buffer->data[reader->current_buffer->offset], bytes_to_copy);
192 bytes_consumed += bytes_to_copy;
193 reader->current_buffer->offset += bytes_to_copy;
194
Zach Johnsonbb170c12014-08-21 21:00:43 -0700195 if (reader->current_buffer->offset >= reader->current_buffer->length) {
196 reader->allocator->free(reader->current_buffer);
197 reader->current_buffer = NULL;
198 }
Zach Johnsond18cfd02014-08-04 20:51:06 -0700199 }
200
Sharvil Nanavati08019f32015-06-13 02:12:08 -0700201 bytes_available -= bytes_consumed;
202 if (eventfd_write(reader->bytes_available_fd, bytes_available) == -1) {
Marie Janssendb554582015-06-26 14:53:46 -0700203 LOG_ERROR(LOG_TAG, "%s unable to write back bytes available for output data.", __func__);
Sharvil Nanavati08019f32015-06-13 02:12:08 -0700204 }
205
206 return bytes_consumed;
Zach Johnsond18cfd02014-08-04 20:51:06 -0700207}
208
Andre Eisenbach6c25b3c2015-10-07 11:16:37 -0700209thread_t* eager_reader_get_read_thread(const eager_reader_t *reader) {
210 assert(reader != NULL);
211 return reader->inbound_read_thread;
212}
213
Zach Johnsonbb170c12014-08-21 21:00:43 -0700214static bool has_byte(const eager_reader_t *reader) {
Zach Johnsond18cfd02014-08-04 20:51:06 -0700215 assert(reader != NULL);
216
217 fd_set read_fds;
Zach Johnsond18cfd02014-08-04 20:51:06 -0700218
Pavlin Radoslavov574dcfb2016-05-12 11:36:44 -0700219 for (;;) {
220 FD_ZERO(&read_fds);
221 FD_SET(reader->bytes_available_fd, &read_fds);
Zach Johnsond18cfd02014-08-04 20:51:06 -0700222
Pavlin Radoslavov574dcfb2016-05-12 11:36:44 -0700223 // Immediate timeout
224 struct timeval timeout;
225 timeout.tv_sec = 0;
226 timeout.tv_usec = 0;
227
228 int ret = select(reader->bytes_available_fd + 1, &read_fds, NULL, NULL,
229 &timeout);
230 if (ret == -1 && errno == EINTR)
231 continue;
232 break;
233 }
234
Zach Johnsond18cfd02014-08-04 20:51:06 -0700235 return FD_ISSET(reader->bytes_available_fd, &read_fds);
236}
237
238static void inbound_data_waiting(void *context) {
239 eager_reader_t *reader = (eager_reader_t *)context;
240
241 data_buffer_t *buffer = (data_buffer_t *)reader->allocator->alloc(reader->buffer_size + sizeof(data_buffer_t));
242 if (!buffer) {
Marie Janssendb554582015-06-26 14:53:46 -0700243 LOG_ERROR(LOG_TAG, "%s couldn't aquire memory for inbound data buffer.", __func__);
Zach Johnsond18cfd02014-08-04 20:51:06 -0700244 return;
245 }
246
247 buffer->length = 0;
248 buffer->offset = 0;
249
Pavlin Radoslavov574dcfb2016-05-12 11:36:44 -0700250 ssize_t bytes_read;
251 OSI_NO_INTR(bytes_read = read(reader->inbound_fd, buffer->data,
252 reader->buffer_size));
Zach Johnsond18cfd02014-08-04 20:51:06 -0700253 if (bytes_read > 0) {
254 // Save the data for later
255 buffer->length = bytes_read;
256 fixed_queue_enqueue(reader->buffers, buffer);
257
258 // Tell consumers data is available by incrementing
259 // the semaphore by the number of bytes we just read
260 eventfd_write(reader->bytes_available_fd, bytes_read);
261 } else {
262 if (bytes_read == 0)
Marie Janssendb554582015-06-26 14:53:46 -0700263 LOG_WARN(LOG_TAG, "%s fd said bytes existed, but none were found.", __func__);
Zach Johnsond18cfd02014-08-04 20:51:06 -0700264 else
Marie Janssendb554582015-06-26 14:53:46 -0700265 LOG_WARN(LOG_TAG, "%s unable to read from file descriptor: %s", __func__, strerror(errno));
Zach Johnsond18cfd02014-08-04 20:51:06 -0700266
267 reader->allocator->free(buffer);
268 }
269}
270
271static void internal_outbound_read_ready(void *context) {
272 assert(context != NULL);
273
274 eager_reader_t *reader = (eager_reader_t *)context;
275 reader->outbound_read_ready(reader, reader->outbound_context);
276}