Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 1 | /****************************************************************************** |
| 2 | * |
| 3 | * Copyright (C) 2014 Google, Inc. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at: |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | ******************************************************************************/ |
| 18 | |
Chris Manton | f802700 | 2015-03-12 09:22:48 -0700 | [diff] [blame] | 19 | #define LOG_TAG "bt_osi_eager_reader" |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 20 | |
| 21 | #include <assert.h> |
| 22 | #include <errno.h> |
| 23 | #include <stddef.h> |
Etan Cohen | 3e59b5b | 2015-03-31 17:15:53 -0700 | [diff] [blame] | 24 | #include <string.h> |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 25 | #include <sys/eventfd.h> |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 26 | |
Sharvil Nanavati | 0f9b91e | 2015-03-12 15:42:50 -0700 | [diff] [blame] | 27 | #include "osi/include/allocator.h" |
| 28 | #include "osi/include/eager_reader.h" |
| 29 | #include "osi/include/fixed_queue.h" |
| 30 | #include "osi/include/osi.h" |
Sharvil Nanavati | 4480276 | 2014-12-23 23:08:58 -0800 | [diff] [blame] | 31 | #include "osi/include/log.h" |
Sharvil Nanavati | 0f9b91e | 2015-03-12 15:42:50 -0700 | [diff] [blame] | 32 | #include "osi/include/reactor.h" |
| 33 | #include "osi/include/thread.h" |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 34 | |
| 35 | #if !defined(EFD_SEMAPHORE) |
| 36 | # define EFD_SEMAPHORE (1 << 0) |
| 37 | #endif |
| 38 | |
| 39 | typedef struct { |
| 40 | size_t length; |
| 41 | size_t offset; |
| 42 | uint8_t data[]; |
| 43 | } data_buffer_t; |
| 44 | |
| 45 | struct eager_reader_t { |
| 46 | int bytes_available_fd; // semaphore mode eventfd which counts the number of available bytes |
| 47 | int inbound_fd; |
| 48 | |
| 49 | const allocator_t *allocator; |
| 50 | size_t buffer_size; |
| 51 | fixed_queue_t *buffers; |
| 52 | data_buffer_t *current_buffer; |
| 53 | |
| 54 | thread_t *inbound_read_thread; |
| 55 | reactor_object_t *inbound_read_object; |
| 56 | |
| 57 | reactor_object_t *outbound_registration; |
| 58 | eager_reader_cb outbound_read_ready; |
| 59 | void *outbound_context; |
| 60 | }; |
| 61 | |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 62 | static bool has_byte(const eager_reader_t *reader); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 63 | static void inbound_data_waiting(void *context); |
| 64 | static void internal_outbound_read_ready(void *context); |
| 65 | |
| 66 | eager_reader_t *eager_reader_new( |
| 67 | int fd_to_read, |
| 68 | const allocator_t *allocator, |
| 69 | size_t buffer_size, |
| 70 | size_t max_buffer_count, |
| 71 | const char *thread_name) { |
| 72 | |
| 73 | assert(fd_to_read != INVALID_FD); |
| 74 | assert(allocator != NULL); |
| 75 | assert(buffer_size > 0); |
| 76 | assert(max_buffer_count > 0); |
| 77 | assert(thread_name != NULL && *thread_name != '\0'); |
| 78 | |
Zach Johnson | ee2aa45 | 2014-08-26 20:16:03 -0700 | [diff] [blame] | 79 | eager_reader_t *ret = osi_calloc(sizeof(eager_reader_t)); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 80 | if (!ret) { |
Sharvil Nanavati | 4480276 | 2014-12-23 23:08:58 -0800 | [diff] [blame] | 81 | LOG_ERROR("%s unable to allocate memory for new eager_reader.", __func__); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 82 | goto error; |
| 83 | } |
| 84 | |
| 85 | ret->allocator = allocator; |
| 86 | ret->inbound_fd = fd_to_read; |
| 87 | |
| 88 | ret->bytes_available_fd = eventfd(0, EFD_SEMAPHORE); |
| 89 | if (ret->bytes_available_fd == INVALID_FD) { |
Sharvil Nanavati | 4480276 | 2014-12-23 23:08:58 -0800 | [diff] [blame] | 90 | LOG_ERROR("%s unable to create output reading semaphore.", __func__); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 91 | goto error; |
| 92 | } |
| 93 | |
| 94 | ret->buffer_size = buffer_size; |
| 95 | |
| 96 | ret->buffers = fixed_queue_new(max_buffer_count); |
| 97 | if (!ret->buffers) { |
Sharvil Nanavati | 4480276 | 2014-12-23 23:08:58 -0800 | [diff] [blame] | 98 | LOG_ERROR("%s unable to create buffers queue.", __func__); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 99 | goto error; |
| 100 | } |
| 101 | |
| 102 | ret->inbound_read_thread = thread_new(thread_name); |
| 103 | if (!ret->inbound_read_thread) { |
Sharvil Nanavati | 4480276 | 2014-12-23 23:08:58 -0800 | [diff] [blame] | 104 | LOG_ERROR("%s unable to make reading thread.", __func__); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 105 | goto error; |
| 106 | } |
| 107 | |
| 108 | ret->inbound_read_object = reactor_register( |
| 109 | thread_get_reactor(ret->inbound_read_thread), |
| 110 | fd_to_read, |
| 111 | ret, |
| 112 | inbound_data_waiting, |
| 113 | NULL |
| 114 | ); |
| 115 | |
| 116 | return ret; |
| 117 | |
| 118 | error:; |
| 119 | eager_reader_free(ret); |
| 120 | return NULL; |
| 121 | } |
| 122 | |
| 123 | void eager_reader_free(eager_reader_t *reader) { |
| 124 | if (!reader) |
| 125 | return; |
| 126 | |
| 127 | eager_reader_unregister(reader); |
| 128 | |
| 129 | // Only unregister from the input if we actually did register |
| 130 | if (reader->inbound_read_object) |
| 131 | reactor_unregister(reader->inbound_read_object); |
| 132 | |
| 133 | if (reader->bytes_available_fd != INVALID_FD) |
| 134 | close(reader->bytes_available_fd); |
| 135 | |
| 136 | // Free the current buffer, because it's not in the queue |
| 137 | // and won't be freed below |
| 138 | if (reader->current_buffer) |
| 139 | reader->allocator->free(reader->current_buffer); |
| 140 | |
| 141 | fixed_queue_free(reader->buffers, reader->allocator->free); |
| 142 | thread_free(reader->inbound_read_thread); |
Zach Johnson | ee2aa45 | 2014-08-26 20:16:03 -0700 | [diff] [blame] | 143 | osi_free(reader); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 144 | } |
| 145 | |
| 146 | void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context) { |
| 147 | assert(reader != NULL); |
| 148 | assert(reactor != NULL); |
| 149 | assert(read_cb != NULL); |
| 150 | |
| 151 | // Make sure the reader isn't currently registered. |
| 152 | eager_reader_unregister(reader); |
| 153 | |
| 154 | reader->outbound_read_ready = read_cb; |
| 155 | reader->outbound_context = context; |
| 156 | reader->outbound_registration = reactor_register(reactor, reader->bytes_available_fd, reader, internal_outbound_read_ready, NULL); |
| 157 | } |
| 158 | |
| 159 | void eager_reader_unregister(eager_reader_t *reader) { |
| 160 | assert(reader != NULL); |
| 161 | |
| 162 | if (reader->outbound_registration) { |
| 163 | reactor_unregister(reader->outbound_registration); |
| 164 | reader->outbound_registration = NULL; |
| 165 | } |
| 166 | } |
| 167 | |
| 168 | // SEE HEADER FOR THREAD SAFETY NOTE |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 169 | size_t eager_reader_read(eager_reader_t *reader, uint8_t *buffer, size_t max_size, bool block) { |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 170 | assert(reader != NULL); |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 171 | assert(buffer != NULL); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 172 | |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 173 | size_t bytes_read = 0; |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 174 | |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 175 | while (bytes_read < max_size) { |
| 176 | if (!block && !has_byte(reader)) |
| 177 | return bytes_read; |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 178 | |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 179 | eventfd_t value; |
| 180 | if (eventfd_read(reader->bytes_available_fd, &value) == -1) |
Sharvil Nanavati | 4480276 | 2014-12-23 23:08:58 -0800 | [diff] [blame] | 181 | LOG_ERROR("%s unable to read semaphore for output data.", __func__); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 182 | |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 183 | if (!reader->current_buffer) |
| 184 | reader->current_buffer = fixed_queue_dequeue(reader->buffers); |
| 185 | |
| 186 | buffer[bytes_read] = reader->current_buffer->data[reader->current_buffer->offset]; |
| 187 | reader->current_buffer->offset++; |
| 188 | bytes_read++; |
| 189 | |
| 190 | // Prep for next byte |
| 191 | if (reader->current_buffer->offset >= reader->current_buffer->length) { |
| 192 | reader->allocator->free(reader->current_buffer); |
| 193 | reader->current_buffer = NULL; |
| 194 | } |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 195 | } |
| 196 | |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 197 | return bytes_read; |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 198 | } |
| 199 | |
Zach Johnson | bb170c1 | 2014-08-21 21:00:43 -0700 | [diff] [blame] | 200 | static bool has_byte(const eager_reader_t *reader) { |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 201 | assert(reader != NULL); |
| 202 | |
| 203 | fd_set read_fds; |
| 204 | FD_ZERO(&read_fds); |
| 205 | FD_SET(reader->bytes_available_fd, &read_fds); |
| 206 | |
| 207 | // Immediate timeout |
| 208 | struct timeval timeout; |
| 209 | timeout.tv_sec = 0; |
| 210 | timeout.tv_usec = 0; |
| 211 | |
| 212 | select(reader->bytes_available_fd + 1, &read_fds, NULL, NULL, &timeout); |
| 213 | return FD_ISSET(reader->bytes_available_fd, &read_fds); |
| 214 | } |
| 215 | |
| 216 | static void inbound_data_waiting(void *context) { |
| 217 | eager_reader_t *reader = (eager_reader_t *)context; |
| 218 | |
| 219 | data_buffer_t *buffer = (data_buffer_t *)reader->allocator->alloc(reader->buffer_size + sizeof(data_buffer_t)); |
| 220 | if (!buffer) { |
Sharvil Nanavati | 4480276 | 2014-12-23 23:08:58 -0800 | [diff] [blame] | 221 | LOG_ERROR("%s couldn't aquire memory for inbound data buffer.", __func__); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 222 | return; |
| 223 | } |
| 224 | |
| 225 | buffer->length = 0; |
| 226 | buffer->offset = 0; |
| 227 | |
| 228 | int bytes_read = read(reader->inbound_fd, buffer->data, reader->buffer_size); |
| 229 | if (bytes_read > 0) { |
| 230 | // Save the data for later |
| 231 | buffer->length = bytes_read; |
| 232 | fixed_queue_enqueue(reader->buffers, buffer); |
| 233 | |
| 234 | // Tell consumers data is available by incrementing |
| 235 | // the semaphore by the number of bytes we just read |
| 236 | eventfd_write(reader->bytes_available_fd, bytes_read); |
| 237 | } else { |
| 238 | if (bytes_read == 0) |
Sharvil Nanavati | 4480276 | 2014-12-23 23:08:58 -0800 | [diff] [blame] | 239 | LOG_WARN("%s fd said bytes existed, but none were found.", __func__); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 240 | else |
Sharvil Nanavati | 4480276 | 2014-12-23 23:08:58 -0800 | [diff] [blame] | 241 | LOG_WARN("%s unable to read from file descriptor: %s", __func__, strerror(errno)); |
Zach Johnson | d18cfd0 | 2014-08-04 20:51:06 -0700 | [diff] [blame] | 242 | |
| 243 | reader->allocator->free(buffer); |
| 244 | } |
| 245 | } |
| 246 | |
| 247 | static void internal_outbound_read_ready(void *context) { |
| 248 | assert(context != NULL); |
| 249 | |
| 250 | eager_reader_t *reader = (eager_reader_t *)context; |
| 251 | reader->outbound_read_ready(reader, reader->outbound_context); |
| 252 | } |