Andy Hung | e7937b9 | 2019-08-28 21:02:23 -0700 | [diff] [blame] | 1 | /* |
| 2 | * Copyright (C) 2019 The Android Open Source Project |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #pragma once |
| 18 | |
| 19 | #include "Stream.h" |
| 20 | |
| 21 | #include <condition_variable> |
| 22 | #include <future> |
| 23 | #include <list> |
| 24 | #include <map> |
| 25 | #include <memory> |
| 26 | #include <mutex> |
| 27 | #include <unordered_set> |
| 28 | #include <vector> |
| 29 | |
| 30 | #include <utils/AndroidThreads.h> |
| 31 | |
| 32 | namespace android::soundpool { |
| 33 | |
| 34 | // TODO: Move helper classes to a utility file, with separate test. |
| 35 | |
| 36 | /** |
| 37 | * JavaThread is used like std::thread but for threads that may call the JVM. |
| 38 | * |
| 39 | * std::thread does not easily attach to the JVM. We need JVM capable threads |
| 40 | * from createThreadEtc() since android binder call optimization may attempt to |
| 41 | * call back into Java if the SoundPool runs in system server. |
| 42 | * |
| 43 | * |
| 44 | * No locking is required - the member variables are inherently thread-safe. |
| 45 | */ |
| 46 | class JavaThread { |
| 47 | public: |
| 48 | JavaThread(std::function<void()> f, const char *name) |
| 49 | : mF{std::move(f)} { |
| 50 | createThreadEtc(staticFunction, this, name); |
| 51 | } |
| 52 | |
| 53 | JavaThread(JavaThread &&) = delete; // uses "this" ptr, not moveable. |
| 54 | |
Andy Hung | eaeced3 | 2020-03-03 13:40:49 -0800 | [diff] [blame] | 55 | ~JavaThread() { |
| 56 | join(); // manually block until the future is ready as std::future |
| 57 | // destructor doesn't block unless it comes from std::async |
| 58 | // and it is the last reference to shared state. |
| 59 | } |
| 60 | |
Andy Hung | e7937b9 | 2019-08-28 21:02:23 -0700 | [diff] [blame] | 61 | void join() const { |
| 62 | mFuture.wait(); |
| 63 | } |
| 64 | |
| 65 | bool isClosed() const { |
| 66 | return mIsClosed; |
| 67 | } |
| 68 | |
| 69 | private: |
| 70 | static int staticFunction(void *data) { |
| 71 | JavaThread *jt = static_cast<JavaThread *>(data); |
| 72 | jt->mF(); |
Andy Hung | 9bebf81 | 2020-03-10 19:06:42 -0700 | [diff] [blame] | 73 | jt->mIsClosed = true; // set the flag that we are closed |
| 74 | // now before we allow the destructor to execute; |
| 75 | // otherwise there may be a use after free. |
Andy Hung | e7937b9 | 2019-08-28 21:02:23 -0700 | [diff] [blame] | 76 | jt->mPromise.set_value(); |
Andy Hung | e7937b9 | 2019-08-28 21:02:23 -0700 | [diff] [blame] | 77 | return 0; |
| 78 | } |
| 79 | |
| 80 | // No locking is provided as these variables are initialized in the constructor |
| 81 | // and the members referenced are thread-safe objects. |
| 82 | // (mFuture.wait() can block multiple threads.) |
| 83 | // Note the order of member variables is reversed for destructor. |
| 84 | const std::function<void()> mF; |
| 85 | // Used in join() to block until the thread completes. |
| 86 | // See https://en.cppreference.com/w/cpp/thread/promise for the void specialization of |
| 87 | // promise. |
| 88 | std::promise<void> mPromise; |
| 89 | std::future<void> mFuture{mPromise.get_future()}; |
| 90 | std::atomic_bool mIsClosed = false; |
| 91 | }; |
| 92 | |
| 93 | /** |
| 94 | * The ThreadPool manages thread lifetimes of SoundPool worker threads. |
| 95 | * |
| 96 | * TODO: the (eventual) goal of ThreadPool is to transparently and cooperatively |
| 97 | * maximize CPU utilization while avoiding starvation of other applications. |
| 98 | * Some possibilities: |
| 99 | * |
| 100 | * We should create worker threads when we have SoundPool work and the system is idle. |
| 101 | * CPU cycles are "use-it-or-lose-it" when the system is idle. |
| 102 | * |
| 103 | * We should adjust the priority of worker threads so that the second (and subsequent) worker |
| 104 | * threads have lower priority (should we try to promote priority also?). |
| 105 | * |
| 106 | * We should throttle the spawning of new worker threads, spacing over time, to avoid |
| 107 | * creating too many new threads all at once, on initialization. |
| 108 | */ |
| 109 | class ThreadPool { |
| 110 | public: |
| 111 | ThreadPool(size_t maxThreadCount, std::string name) |
| 112 | : mMaxThreadCount(maxThreadCount) |
| 113 | , mName{std::move(name)} { } |
| 114 | |
| 115 | ~ThreadPool() { quit(); } |
| 116 | |
| 117 | size_t getActiveThreadCount() const { return mActiveThreadCount; } |
| 118 | size_t getMaxThreadCount() const { return mMaxThreadCount; } |
| 119 | |
| 120 | void quit() { |
| 121 | std::list<std::unique_ptr<JavaThread>> threads; |
| 122 | { |
| 123 | std::lock_guard lock(mThreadLock); |
| 124 | if (mQuit) return; // already joined. |
| 125 | mQuit = true; |
| 126 | threads = std::move(mThreads); |
| 127 | mThreads.clear(); |
| 128 | } |
| 129 | // mQuit set under lock, no more threads will be created. |
| 130 | for (auto &thread : threads) { |
| 131 | thread->join(); |
| 132 | thread.reset(); |
| 133 | } |
| 134 | LOG_ALWAYS_FATAL_IF(mActiveThreadCount != 0, |
| 135 | "Invalid Active Threads: %zu", (size_t)mActiveThreadCount); |
| 136 | } |
| 137 | |
| 138 | // returns a non-zero id if successful, the id is to help logging messages. |
| 139 | int32_t launch(std::function<void(int32_t /* id */)> f) { |
| 140 | std::list<std::unique_ptr<JavaThread>> threadsToRelease; // release outside of lock. |
| 141 | std::lock_guard lock(mThreadLock); |
| 142 | if (mQuit) return 0; // ignore if we have quit |
| 143 | |
| 144 | // clean up threads. |
| 145 | for (auto it = mThreads.begin(); it != mThreads.end(); ) { |
| 146 | if ((*it)->isClosed()) { |
| 147 | threadsToRelease.emplace_back(std::move(*it)); |
| 148 | it = mThreads.erase(it); |
| 149 | } else { |
| 150 | ++it; |
| 151 | } |
| 152 | } |
| 153 | |
| 154 | const size_t threadCount = mThreads.size(); |
| 155 | if (threadCount < mMaxThreadCount) { |
| 156 | // if the id wraps, we don't care about collisions. it's just for logging. |
| 157 | mNextThreadId = mNextThreadId == INT32_MAX ? 1 : ++mNextThreadId; |
| 158 | const int32_t id = mNextThreadId; |
| 159 | mThreads.emplace_back(std::make_unique<JavaThread>( |
| 160 | [this, id, mf = std::move(f)] { mf(id); --mActiveThreadCount; }, |
| 161 | (mName + std::to_string(id)).c_str())); |
| 162 | ++mActiveThreadCount; |
| 163 | return id; |
| 164 | } |
| 165 | return 0; |
| 166 | } |
| 167 | |
| 168 | // TODO: launch only if load average is low. |
| 169 | // This gets the load average |
| 170 | // See also std::thread::hardware_concurrency() for the concurrent capability. |
| 171 | static double getLoadAvg() { |
| 172 | double loadAvg[1]; |
| 173 | if (getloadavg(loadAvg, std::size(loadAvg)) > 0) { |
| 174 | return loadAvg[0]; |
| 175 | } |
| 176 | return -1.; |
| 177 | } |
| 178 | |
| 179 | private: |
| 180 | const size_t mMaxThreadCount; |
| 181 | const std::string mName; |
| 182 | |
| 183 | std::atomic_size_t mActiveThreadCount = 0; |
| 184 | |
| 185 | std::mutex mThreadLock; |
| 186 | bool mQuit = false; // GUARDED_BY(mThreadLock) |
| 187 | int32_t mNextThreadId = 0; // GUARDED_BY(mThreadLock) |
| 188 | std::list<std::unique_ptr<JavaThread>> mThreads; // GUARDED_BY(mThreadLock) |
| 189 | }; |
| 190 | |
| 191 | /** |
| 192 | * A Perfect HashTable for IDs (key) to pointers (value). |
| 193 | * |
| 194 | * There are no collisions. Why? because we generate the IDs for you to look up :-). |
| 195 | * |
| 196 | * The goal of this hash table is to map an integer ID handle > 0 to a pointer. |
| 197 | * We give these IDs in monotonic order (though we may skip if it were to cause a collision). |
| 198 | * |
| 199 | * The size of the hashtable must be large enough to accommodate the max number of keys. |
| 200 | * We suggest 2x. |
| 201 | * |
| 202 | * Readers are lockless |
| 203 | * Single writer could be lockless, but we allow multiple writers through an internal lock. |
| 204 | * |
| 205 | * For the Key type K, valid keys generated are > 0 (signed or unsigned) |
| 206 | * For the Value type V, values are pointers - nullptr means empty. |
| 207 | */ |
| 208 | template <typename K, typename V> |
| 209 | class PerfectHash { |
| 210 | public: |
| 211 | PerfectHash(size_t hashCapacity) |
| 212 | : mHashCapacity(hashCapacity) |
| 213 | , mK2V{new std::atomic<V>[hashCapacity]()} { |
| 214 | } |
| 215 | |
| 216 | // Generate a key for a value V. |
| 217 | // There is a testing function getKforV() which checks what the value reports as its key. |
| 218 | // |
| 219 | // Calls back into getKforV under lock. |
| 220 | // |
| 221 | // We expect that the hashCapacity is 2x the number of stored keys in order |
| 222 | // to have one or two tries to find an empty slot |
| 223 | K generateKey(V value, std::function<K(V)> getKforV, K oldKey = 0) { |
| 224 | std::lock_guard lock(mHashLock); |
| 225 | // try to remove the old key. |
| 226 | if (oldKey > 0) { // key valid |
| 227 | const V v = getValue(oldKey); |
| 228 | if (v != nullptr) { // value still valid |
| 229 | const K atPosition = getKforV(v); |
| 230 | if (atPosition < 0 || // invalid value |
| 231 | atPosition == oldKey || // value's key still valid and matches old key |
| 232 | ((atPosition ^ oldKey) & (mHashCapacity - 1)) != 0) { // stale key entry |
| 233 | getValue(oldKey) = nullptr; // invalidate |
| 234 | } |
| 235 | } // else if value is invalid, no need to invalidate. |
| 236 | } |
| 237 | // check if we are invalidating only. |
| 238 | if (value == nullptr) return 0; |
| 239 | // now insert the new value and return the key. |
| 240 | size_t tries = 0; |
| 241 | for (; tries < mHashCapacity; ++tries) { |
| 242 | mNextKey = mNextKey == std::numeric_limits<K>::max() ? 1 : mNextKey + 1; |
| 243 | const V v = getValue(mNextKey); |
| 244 | //ALOGD("tries: %zu, key:%d value:%p", tries, (int)mNextKey, v); |
| 245 | if (v == nullptr) break; // empty |
| 246 | const K atPosition = getKforV(v); |
| 247 | //ALOGD("tries: %zu key atPosition:%d", tries, (int)atPosition); |
| 248 | if (atPosition < 0 || // invalid value |
| 249 | ((atPosition ^ mNextKey) & (mHashCapacity - 1)) != 0) { // stale key entry |
| 250 | break; |
| 251 | } |
| 252 | } |
| 253 | LOG_ALWAYS_FATAL_IF(tries == mHashCapacity, "hash table overflow!"); |
| 254 | //ALOGD("%s: found after %zu tries", __func__, tries); |
| 255 | getValue(mNextKey) = value; |
| 256 | return mNextKey; |
| 257 | } |
| 258 | |
| 259 | std::atomic<V> &getValue(K key) { return mK2V[key & (mHashCapacity - 1)]; } |
| 260 | const std::atomic_int32_t &getValue(K key) const { return mK2V[key & (mHashCapacity - 1)]; } |
| 261 | |
| 262 | private: |
| 263 | mutable std::mutex mHashLock; |
| 264 | const size_t mHashCapacity; // size of mK2V no lock needed. |
| 265 | std::unique_ptr<std::atomic<V>[]> mK2V; // no lock needed for read access. |
| 266 | K mNextKey{}; // GUARDED_BY(mHashLock) |
| 267 | }; |
| 268 | |
| 269 | /** |
| 270 | * StreamMap contains the all the valid streams available to SoundPool. |
| 271 | * |
| 272 | * There is no Lock required for this class because the streams are |
| 273 | * allocated in the constructor, the lookup is lockless, and the Streams |
| 274 | * returned are locked internally. |
| 275 | * |
| 276 | * The lookup uses a perfect hash. |
| 277 | * It is possible to use a lockless hash table or to use a stripe-locked concurrent |
| 278 | * hashmap for essentially lock-free lookup. |
| 279 | * |
| 280 | * This follows Map-Reduce parallelism model. |
| 281 | * https://en.wikipedia.org/wiki/MapReduce |
| 282 | * |
| 283 | * Conceivably the forEach could be parallelized using std::for_each with a |
| 284 | * std::execution::par policy. |
| 285 | * |
| 286 | * https://en.cppreference.com/w/cpp/algorithm/for_each |
| 287 | */ |
| 288 | class StreamMap { |
| 289 | public: |
| 290 | explicit StreamMap(int32_t streams); |
| 291 | |
| 292 | // Returns the stream associated with streamID or nullptr if not found. |
| 293 | // This need not be locked. |
| 294 | // The stream ID will never migrate to another Stream, but it may change |
| 295 | // underneath you. The Stream operations that take a streamID will confirm |
| 296 | // that the streamID matches under the Stream lock before executing otherwise |
| 297 | // it ignores the command as stale. |
| 298 | Stream* findStream(int32_t streamID) const; |
| 299 | |
| 300 | // Iterates through the stream pool applying the function f. |
| 301 | // Since this enumerates over every single stream, it is unlocked. |
| 302 | // |
| 303 | // See related: https://en.cppreference.com/w/cpp/algorithm/for_each |
| 304 | void forEach(std::function<void(const Stream *)>f) const { |
| 305 | for (size_t i = 0; i < mStreamPoolSize; ++i) { |
| 306 | f(&mStreamPool[i]); |
| 307 | } |
| 308 | } |
| 309 | |
| 310 | void forEach(std::function<void(Stream *)>f) { |
| 311 | for (size_t i = 0; i < mStreamPoolSize; ++i) { |
| 312 | f(&mStreamPool[i]); |
| 313 | } |
| 314 | } |
| 315 | |
| 316 | // Returns the pair stream for a given Stream. |
| 317 | // This need not be locked as it is a property of the pointer address. |
| 318 | Stream* getPairStream(const Stream* stream) const { |
| 319 | const size_t index = streamPosition(stream); |
| 320 | return &mStreamPool[index ^ 1]; |
| 321 | } |
| 322 | |
| 323 | // find the position of the stream in mStreamPool array. |
| 324 | size_t streamPosition(const Stream* stream) const; // no lock needed |
| 325 | |
| 326 | size_t getStreamMapSize() const { |
| 327 | return mStreamPoolSize; |
| 328 | } |
| 329 | |
| 330 | // find the next valid ID for a stream and store in hash table. |
| 331 | int32_t getNextIdForStream(Stream* stream) const; |
| 332 | |
| 333 | private: |
| 334 | |
| 335 | // use the hash table to attempt to find the stream. |
| 336 | // nullptr is returned if the lookup fails. |
| 337 | Stream* lookupStreamFromId(int32_t streamID) const; |
| 338 | |
| 339 | // The stream pool is initialized in the constructor, effectively const. |
| 340 | // no locking required for access. |
| 341 | // |
| 342 | // The constructor parameter "streams" results in streams pairs of streams. |
| 343 | // We have twice as many streams because we wish to return a streamID "handle" |
| 344 | // back to the app immediately, while we may be stopping the other stream in the |
| 345 | // pair to get its AudioTrack :-). |
| 346 | // |
| 347 | // Of the stream pair, only one of the streams may have an AudioTrack. |
| 348 | // The fixed association of a stream pair allows callbacks from the AudioTrack |
| 349 | // to be associated properly to either one or the other of the stream pair. |
| 350 | // |
| 351 | // TODO: The stream pair arrangement can be removed if we have better AudioTrack |
| 352 | // callback handling (being able to remove and change the callback after construction). |
| 353 | // |
| 354 | // Streams may be accessed anytime off of the stream pool |
| 355 | // as there is internal locking on each stream. |
| 356 | std::unique_ptr<Stream[]> mStreamPool; // no lock needed for access. |
| 357 | size_t mStreamPoolSize; // no lock needed for access. |
| 358 | |
| 359 | // In order to find the Stream from a StreamID, we could do a linear lookup in mStreamPool. |
| 360 | // As an alternative, one could use stripe-locked or lock-free concurrent hashtables. |
| 361 | // |
| 362 | // When considering linear search vs hashmap, verify the typical use-case size. |
| 363 | // Linear search is faster than std::unordered_map (circa 2018) for less than 40 elements. |
| 364 | // [ Skarupke, M. (2018), "You Can Do Better than std::unordered_map: New and Recent |
| 365 | // Improvements to Hash Table Performance." C++Now 2018. cppnow.org, see |
| 366 | // https://www.youtube.com/watch?v=M2fKMP47slQ ] |
| 367 | // |
| 368 | // Here, we use a PerfectHash of Id to Stream *, since we can control the |
| 369 | // StreamID returned to the user. This allows O(1) read access to mStreamPool lock-free. |
| 370 | // |
| 371 | // We prefer that the next stream ID is monotonic for aesthetic reasons |
| 372 | // (if we didn't care about monotonicity, a simple method is to apply a generation count |
| 373 | // to each stream in the unused upper bits of its index in mStreamPool for the id). |
| 374 | // |
| 375 | std::unique_ptr<PerfectHash<int32_t, Stream *>> mPerfectHash; |
| 376 | }; |
| 377 | |
| 378 | /** |
| 379 | * StreamManager is used to manage the streams (accessed by StreamID from Java). |
| 380 | * |
| 381 | * Locking order (proceeds from application to component). |
| 382 | * SoundPool mApiLock (if needed) -> StreamManager mStreamManagerLock |
| 383 | * -> pair Stream mLock -> queued Stream mLock |
| 384 | */ |
| 385 | class StreamManager : public StreamMap { |
| 386 | public: |
| 387 | // Note: the SoundPool pointer is only used for stream initialization. |
| 388 | // It is not stored in StreamManager. |
| 389 | StreamManager(int32_t streams, size_t threads, const audio_attributes_t* attributes); |
| 390 | ~StreamManager(); |
| 391 | |
| 392 | // Returns positive streamID on success, 0 on failure. This is locked. |
| 393 | int32_t queueForPlay(const std::shared_ptr<Sound> &sound, |
| 394 | int32_t soundID, float leftVolume, float rightVolume, |
| 395 | int32_t priority, int32_t loop, float rate); |
| 396 | |
| 397 | /////////////////////////////////////////////////////////////////////// |
| 398 | // Called from soundpool::Stream |
| 399 | |
| 400 | const audio_attributes_t* getAttributes() const { return &mAttributes; } |
| 401 | |
| 402 | // Moves the stream to the restart queue (called upon BUFFER_END of the static track) |
| 403 | // this is locked internally. |
| 404 | // If activeStreamIDToMatch is nonzero, it will only move to the restart queue |
| 405 | // if the streamIDToMatch is found on the active queue. |
| 406 | void moveToRestartQueue(Stream* stream, int32_t activeStreamIDToMatch = 0); |
| 407 | |
| 408 | private: |
| 409 | |
| 410 | void run(int32_t id); // worker thread, takes lock internally. |
| 411 | void dump() const; // no lock needed |
| 412 | |
| 413 | // returns true if more worker threads are needed. |
| 414 | bool needMoreThreads_l() { |
| 415 | return mRestartStreams.size() > 0 && |
| 416 | (mThreadPool->getActiveThreadCount() == 0 |
| 417 | || std::distance(mRestartStreams.begin(), |
| 418 | mRestartStreams.upper_bound(systemTime())) |
| 419 | > (ptrdiff_t)mThreadPool->getActiveThreadCount()); |
| 420 | } |
| 421 | |
| 422 | // returns true if the stream was added. |
| 423 | bool moveToRestartQueue_l(Stream* stream, int32_t activeStreamIDToMatch = 0); |
| 424 | // returns number of queues the stream was removed from (should be 0 or 1); |
| 425 | // a special code of -1 is returned if activeStreamIDToMatch is > 0 and |
| 426 | // the stream wasn't found on the active queue. |
| 427 | ssize_t removeFromQueues_l(Stream* stream, int32_t activeStreamIDToMatch = 0); |
| 428 | void addToRestartQueue_l(Stream *stream); |
| 429 | void addToActiveQueue_l(Stream *stream); |
| 430 | void sanityCheckQueue_l() const; |
| 431 | |
| 432 | const audio_attributes_t mAttributes; |
| 433 | std::unique_ptr<ThreadPool> mThreadPool; // locked internally |
| 434 | |
| 435 | // mStreamManagerLock is used to lock access for transitions between the |
| 436 | // 4 stream queues by the Manager Thread or by the user initiated play(). |
| 437 | // A stream pair has exactly one stream on exactly one of the queues. |
| 438 | std::mutex mStreamManagerLock; |
| 439 | std::condition_variable mStreamManagerCondition; |
| 440 | |
| 441 | bool mQuit = false; // GUARDED_BY(mStreamManagerLock) |
| 442 | |
| 443 | // There are constructor arg "streams" pairs of streams, only one of each |
| 444 | // pair on the 4 stream queues below. The other stream in the pair serves as |
| 445 | // placeholder to accumulate user changes, pending actual availability of the |
| 446 | // AudioTrack, as it may be in use, requiring stop-then-restart. |
| 447 | // |
| 448 | // The 4 queues are implemented in the appropriate STL container based on perceived |
| 449 | // optimality. |
| 450 | |
| 451 | // 1) mRestartStreams: Streams awaiting stop. |
| 452 | // The paired stream may be active (but with no AudioTrack), and will be restarted |
| 453 | // with an active AudioTrack when the current stream is stopped. |
| 454 | std::multimap<int64_t /* stopTimeNs */, Stream*> |
| 455 | mRestartStreams; // GUARDED_BY(mStreamManagerLock) |
| 456 | |
| 457 | // 2) mActiveStreams: Streams that are active. |
| 458 | // The paired stream will be inactive. |
| 459 | // This is in order of specified by kStealActiveStream_OldestFirst |
| 460 | std::list<Stream*> mActiveStreams; // GUARDED_BY(mStreamManagerLock) |
| 461 | |
| 462 | // 3) mAvailableStreams: Streams that are inactive. |
| 463 | // The paired stream will also be inactive. |
| 464 | // No particular order. |
| 465 | std::unordered_set<Stream*> mAvailableStreams; // GUARDED_BY(mStreamManagerLock) |
| 466 | |
| 467 | // 4) mProcessingStreams: Streams that are being processed by the ManagerThreads |
| 468 | // When on this queue, the stream and its pair are not available for stealing. |
| 469 | // Each ManagerThread will have at most one stream on the mProcessingStreams queue. |
| 470 | // The paired stream may be active or restarting. |
| 471 | // No particular order. |
| 472 | std::unordered_set<Stream*> mProcessingStreams; // GUARDED_BY(mStreamManagerLock) |
| 473 | }; |
| 474 | |
| 475 | } // namespace android::soundpool |