Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 1 | /****************************************************************************** |
| 2 | * |
| 3 | * Copyright (C) 2014 Google, Inc. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at: |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | ******************************************************************************/ |
| 18 | |
Chris Manton | f802700 | 2015-03-12 09:22:48 -0700 | [diff] [blame] | 19 | #define LOG_TAG "bt_osi_eager_reader" |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 20 | |
Marie Janssen | 49a8670 | 2015-07-08 11:48:57 -0700 | [diff] [blame] | 21 | #include "osi/include/eager_reader.h" |
| 22 | |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 23 | #include <assert.h> |
| 24 | #include <errno.h> |
Etan Cohen | 3e59b5b | 2015-03-31 17:15:53 -0700 | [diff] [blame] | 25 | #include <string.h> |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 26 | #include <sys/eventfd.h> |
Marie Janssen | 49a8670 | 2015-07-08 11:48:57 -0700 | [diff] [blame] | 27 | #include <unistd.h> |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 28 | |
Sharvil Nanavati | 0f9b91e | 2015-03-12 15:42:50 -0700 | [diff] [blame] | 29 | #include "osi/include/fixed_queue.h" |
Sharvil Nanavati | 4480276 | 2014-12-23 23:08:58 -0800 | [diff] [blame] | 30 | #include "osi/include/log.h" |
Marie Janssen | 49a8670 | 2015-07-08 11:48:57 -0700 | [diff] [blame] | 31 | #include "osi/include/osi.h" |
Sharvil Nanavati | 0f9b91e | 2015-03-12 15:42:50 -0700 | [diff] [blame] | 32 | #include "osi/include/reactor.h" |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 33 | |
| 34 | #if !defined(EFD_SEMAPHORE) |
| 35 | # define EFD_SEMAPHORE (1 << 0) |
| 36 | #endif |
| 37 | |
| 38 | typedef struct { |
| 39 | size_t length; |
| 40 | size_t offset; |
| 41 | uint8_t data[]; |
| 42 | } data_buffer_t; |
| 43 | |
| 44 | struct eager_reader_t { |
| 45 | int bytes_available_fd; // semaphore mode eventfd which counts the number of available bytes |
| 46 | int inbound_fd; |
| 47 | |
| 48 | const allocator_t *allocator; |
| 49 | size_t buffer_size; |
| 50 | fixed_queue_t *buffers; |
| 51 | data_buffer_t *current_buffer; |
| 52 | |
| 53 | thread_t *inbound_read_thread; |
| 54 | reactor_object_t *inbound_read_object; |
| 55 | |
| 56 | reactor_object_t *outbound_registration; |
| 57 | eager_reader_cb outbound_read_ready; |
| 58 | void *outbound_context; |
| 59 | }; |
| 60 | |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 61 | static bool has_byte(const eager_reader_t *reader); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 62 | static void inbound_data_waiting(void *context); |
| 63 | static void internal_outbound_read_ready(void *context); |
| 64 | |
| 65 | eager_reader_t *eager_reader_new( |
| 66 | int fd_to_read, |
| 67 | const allocator_t *allocator, |
| 68 | size_t buffer_size, |
| 69 | size_t max_buffer_count, |
| 70 | const char *thread_name) { |
| 71 | |
| 72 | assert(fd_to_read != INVALID_FD); |
| 73 | assert(allocator != NULL); |
| 74 | assert(buffer_size > 0); |
| 75 | assert(max_buffer_count > 0); |
| 76 | assert(thread_name != NULL && *thread_name != '\0'); |
| 77 | |
Zach Johnson | ee2aa45 | 2014-08-26 20:16:03 -0700 | [diff] [blame] | 78 | eager_reader_t *ret = osi_calloc(sizeof(eager_reader_t)); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 79 | |
| 80 | ret->allocator = allocator; |
| 81 | ret->inbound_fd = fd_to_read; |
| 82 | |
Sharvil Nanavati | 08019f3 | 2015-06-13 02:12:08 -0700 | [diff] [blame] | 83 | ret->bytes_available_fd = eventfd(0, 0); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 84 | if (ret->bytes_available_fd == INVALID_FD) { |
Marie Janssen | db55458 | 2015-06-26 14:53:46 -0700 | [diff] [blame] | 85 | LOG_ERROR(LOG_TAG, "%s unable to create output reading semaphore.", __func__); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 86 | goto error; |
| 87 | } |
| 88 | |
| 89 | ret->buffer_size = buffer_size; |
| 90 | |
| 91 | ret->buffers = fixed_queue_new(max_buffer_count); |
| 92 | if (!ret->buffers) { |
Marie Janssen | db55458 | 2015-06-26 14:53:46 -0700 | [diff] [blame] | 93 | LOG_ERROR(LOG_TAG, "%s unable to create buffers queue.", __func__); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 94 | goto error; |
| 95 | } |
| 96 | |
| 97 | ret->inbound_read_thread = thread_new(thread_name); |
| 98 | if (!ret->inbound_read_thread) { |
Marie Janssen | db55458 | 2015-06-26 14:53:46 -0700 | [diff] [blame] | 99 | LOG_ERROR(LOG_TAG, "%s unable to make reading thread.", __func__); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 100 | goto error; |
| 101 | } |
| 102 | |
| 103 | ret->inbound_read_object = reactor_register( |
| 104 | thread_get_reactor(ret->inbound_read_thread), |
| 105 | fd_to_read, |
| 106 | ret, |
| 107 | inbound_data_waiting, |
| 108 | NULL |
| 109 | ); |
| 110 | |
| 111 | return ret; |
| 112 | |
| 113 | error:; |
| 114 | eager_reader_free(ret); |
| 115 | return NULL; |
| 116 | } |
| 117 | |
| 118 | void eager_reader_free(eager_reader_t *reader) { |
| 119 | if (!reader) |
| 120 | return; |
| 121 | |
| 122 | eager_reader_unregister(reader); |
| 123 | |
| 124 | // Only unregister from the input if we actually did register |
| 125 | if (reader->inbound_read_object) |
| 126 | reactor_unregister(reader->inbound_read_object); |
| 127 | |
| 128 | if (reader->bytes_available_fd != INVALID_FD) |
| 129 | close(reader->bytes_available_fd); |
| 130 | |
| 131 | // Free the current buffer, because it's not in the queue |
| 132 | // and won't be freed below |
| 133 | if (reader->current_buffer) |
| 134 | reader->allocator->free(reader->current_buffer); |
| 135 | |
| 136 | fixed_queue_free(reader->buffers, reader->allocator->free); |
| 137 | thread_free(reader->inbound_read_thread); |
Zach Johnson | ee2aa45 | 2014-08-26 20:16:03 -0700 | [diff] [blame] | 138 | osi_free(reader); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 139 | } |
| 140 | |
| 141 | void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context) { |
| 142 | assert(reader != NULL); |
| 143 | assert(reactor != NULL); |
| 144 | assert(read_cb != NULL); |
| 145 | |
| 146 | // Make sure the reader isn't currently registered. |
| 147 | eager_reader_unregister(reader); |
| 148 | |
| 149 | reader->outbound_read_ready = read_cb; |
| 150 | reader->outbound_context = context; |
| 151 | reader->outbound_registration = reactor_register(reactor, reader->bytes_available_fd, reader, internal_outbound_read_ready, NULL); |
| 152 | } |
| 153 | |
| 154 | void eager_reader_unregister(eager_reader_t *reader) { |
| 155 | assert(reader != NULL); |
| 156 | |
| 157 | if (reader->outbound_registration) { |
| 158 | reactor_unregister(reader->outbound_registration); |
| 159 | reader->outbound_registration = NULL; |
| 160 | } |
| 161 | } |
| 162 | |
| 163 | // SEE HEADER FOR THREAD SAFETY NOTE |
Andre Eisenbach | b9757ee | 2015-11-20 14:07:24 -0800 | [diff] [blame] | 164 | size_t eager_reader_read(eager_reader_t *reader, uint8_t *buffer, size_t max_size) { |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 165 | assert(reader != NULL); |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 166 | assert(buffer != NULL); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 167 | |
Andre Eisenbach | b9757ee | 2015-11-20 14:07:24 -0800 | [diff] [blame] | 168 | // Poll to see if we have any bytes available before reading. |
| 169 | if (!has_byte(reader)) |
Sharvil Nanavati | 08019f3 | 2015-06-13 02:12:08 -0700 | [diff] [blame] | 170 | return 0; |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 171 | |
Sharvil Nanavati | 08019f3 | 2015-06-13 02:12:08 -0700 | [diff] [blame] | 172 | // Find out how many bytes we have available in our various buffers. |
| 173 | eventfd_t bytes_available; |
| 174 | if (eventfd_read(reader->bytes_available_fd, &bytes_available) == -1) { |
Marie Janssen | db55458 | 2015-06-26 14:53:46 -0700 | [diff] [blame] | 175 | LOG_ERROR(LOG_TAG, "%s unable to read semaphore for output data.", __func__); |
Sharvil Nanavati | 08019f3 | 2015-06-13 02:12:08 -0700 | [diff] [blame] | 176 | return 0; |
| 177 | } |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 178 | |
Sharvil Nanavati | 08019f3 | 2015-06-13 02:12:08 -0700 | [diff] [blame] | 179 | if (max_size > bytes_available) |
| 180 | max_size = bytes_available; |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 181 | |
Sharvil Nanavati | 08019f3 | 2015-06-13 02:12:08 -0700 | [diff] [blame] | 182 | size_t bytes_consumed = 0; |
| 183 | while (bytes_consumed < max_size) { |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 184 | if (!reader->current_buffer) |
| 185 | reader->current_buffer = fixed_queue_dequeue(reader->buffers); |
| 186 | |
Sharvil Nanavati | 08019f3 | 2015-06-13 02:12:08 -0700 | [diff] [blame] | 187 | size_t bytes_to_copy = reader->current_buffer->length - reader->current_buffer->offset; |
| 188 | if (bytes_to_copy > (max_size - bytes_consumed)) |
| 189 | bytes_to_copy = max_size - bytes_consumed; |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 190 | |
Sharvil Nanavati | 08019f3 | 2015-06-13 02:12:08 -0700 | [diff] [blame] | 191 | memcpy(&buffer[bytes_consumed], &reader->current_buffer->data[reader->current_buffer->offset], bytes_to_copy); |
| 192 | bytes_consumed += bytes_to_copy; |
| 193 | reader->current_buffer->offset += bytes_to_copy; |
| 194 | |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 195 | if (reader->current_buffer->offset >= reader->current_buffer->length) { |
| 196 | reader->allocator->free(reader->current_buffer); |
| 197 | reader->current_buffer = NULL; |
| 198 | } |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 199 | } |
| 200 | |
Sharvil Nanavati | 08019f3 | 2015-06-13 02:12:08 -0700 | [diff] [blame] | 201 | bytes_available -= bytes_consumed; |
| 202 | if (eventfd_write(reader->bytes_available_fd, bytes_available) == -1) { |
Marie Janssen | db55458 | 2015-06-26 14:53:46 -0700 | [diff] [blame] | 203 | LOG_ERROR(LOG_TAG, "%s unable to write back bytes available for output data.", __func__); |
Sharvil Nanavati | 08019f3 | 2015-06-13 02:12:08 -0700 | [diff] [blame] | 204 | } |
| 205 | |
| 206 | return bytes_consumed; |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 207 | } |
| 208 | |
Andre Eisenbach | 6c25b3c | 2015-10-07 11:16:37 -0700 | [diff] [blame] | 209 | thread_t* eager_reader_get_read_thread(const eager_reader_t *reader) { |
| 210 | assert(reader != NULL); |
| 211 | return reader->inbound_read_thread; |
| 212 | } |
| 213 | |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 214 | static bool has_byte(const eager_reader_t *reader) { |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 215 | assert(reader != NULL); |
| 216 | |
| 217 | fd_set read_fds; |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 218 | |
Pavlin Radoslavov | 574dcfb | 2016-05-12 11:36:44 -0700 | [diff] [blame^] | 219 | for (;;) { |
| 220 | FD_ZERO(&read_fds); |
| 221 | FD_SET(reader->bytes_available_fd, &read_fds); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 222 | |
Pavlin Radoslavov | 574dcfb | 2016-05-12 11:36:44 -0700 | [diff] [blame^] | 223 | // Immediate timeout |
| 224 | struct timeval timeout; |
| 225 | timeout.tv_sec = 0; |
| 226 | timeout.tv_usec = 0; |
| 227 | |
| 228 | int ret = select(reader->bytes_available_fd + 1, &read_fds, NULL, NULL, |
| 229 | &timeout); |
| 230 | if (ret == -1 && errno == EINTR) |
| 231 | continue; |
| 232 | break; |
| 233 | } |
| 234 | |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 235 | return FD_ISSET(reader->bytes_available_fd, &read_fds); |
| 236 | } |
| 237 | |
| 238 | static void inbound_data_waiting(void *context) { |
| 239 | eager_reader_t *reader = (eager_reader_t *)context; |
| 240 | |
| 241 | data_buffer_t *buffer = (data_buffer_t *)reader->allocator->alloc(reader->buffer_size + sizeof(data_buffer_t)); |
| 242 | if (!buffer) { |
Marie Janssen | db55458 | 2015-06-26 14:53:46 -0700 | [diff] [blame] | 243 | LOG_ERROR(LOG_TAG, "%s couldn't aquire memory for inbound data buffer.", __func__); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 244 | return; |
| 245 | } |
| 246 | |
| 247 | buffer->length = 0; |
| 248 | buffer->offset = 0; |
| 249 | |
Pavlin Radoslavov | 574dcfb | 2016-05-12 11:36:44 -0700 | [diff] [blame^] | 250 | ssize_t bytes_read; |
| 251 | OSI_NO_INTR(bytes_read = read(reader->inbound_fd, buffer->data, |
| 252 | reader->buffer_size)); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 253 | if (bytes_read > 0) { |
| 254 | // Save the data for later |
| 255 | buffer->length = bytes_read; |
| 256 | fixed_queue_enqueue(reader->buffers, buffer); |
| 257 | |
| 258 | // Tell consumers data is available by incrementing |
| 259 | // the semaphore by the number of bytes we just read |
| 260 | eventfd_write(reader->bytes_available_fd, bytes_read); |
| 261 | } else { |
| 262 | if (bytes_read == 0) |
Marie Janssen | db55458 | 2015-06-26 14:53:46 -0700 | [diff] [blame] | 263 | LOG_WARN(LOG_TAG, "%s fd said bytes existed, but none were found.", __func__); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 264 | else |
Marie Janssen | db55458 | 2015-06-26 14:53:46 -0700 | [diff] [blame] | 265 | LOG_WARN(LOG_TAG, "%s unable to read from file descriptor: %s", __func__, strerror(errno)); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 266 | |
| 267 | reader->allocator->free(buffer); |
| 268 | } |
| 269 | } |
| 270 | |
| 271 | static void internal_outbound_read_ready(void *context) { |
| 272 | assert(context != NULL); |
| 273 | |
| 274 | eager_reader_t *reader = (eager_reader_t *)context; |
| 275 | reader->outbound_read_ready(reader, reader->outbound_context); |
| 276 | } |