blob: 30ad220db05b285674fdb8e172261b62431195bc [file] [log] [blame]
Andy Hunge7937b92019-08-28 21:02:23 -07001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include "Stream.h"
20
21#include <condition_variable>
22#include <future>
23#include <list>
24#include <map>
25#include <memory>
26#include <mutex>
27#include <unordered_set>
28#include <vector>
29
30#include <utils/AndroidThreads.h>
31
32namespace android::soundpool {
33
34// TODO: Move helper classes to a utility file, with separate test.
35
36/**
37 * JavaThread is used like std::thread but for threads that may call the JVM.
38 *
39 * std::thread does not easily attach to the JVM. We need JVM capable threads
40 * from createThreadEtc() since android binder call optimization may attempt to
41 * call back into Java if the SoundPool runs in system server.
42 *
43 *
44 * No locking is required - the member variables are inherently thread-safe.
45 */
46class JavaThread {
47public:
48 JavaThread(std::function<void()> f, const char *name)
49 : mF{std::move(f)} {
50 createThreadEtc(staticFunction, this, name);
51 }
52
53 JavaThread(JavaThread &&) = delete; // uses "this" ptr, not moveable.
54
Andy Hungeaeced32020-03-03 13:40:49 -080055 ~JavaThread() {
56 join(); // manually block until the future is ready as std::future
57 // destructor doesn't block unless it comes from std::async
58 // and it is the last reference to shared state.
59 }
60
Andy Hunge7937b92019-08-28 21:02:23 -070061 void join() const {
62 mFuture.wait();
63 }
64
65 bool isClosed() const {
66 return mIsClosed;
67 }
68
69private:
70 static int staticFunction(void *data) {
71 JavaThread *jt = static_cast<JavaThread *>(data);
72 jt->mF();
Andy Hung9bebf812020-03-10 19:06:42 -070073 jt->mIsClosed = true; // set the flag that we are closed
74 // now before we allow the destructor to execute;
75 // otherwise there may be a use after free.
Andy Hunge7937b92019-08-28 21:02:23 -070076 jt->mPromise.set_value();
Andy Hunge7937b92019-08-28 21:02:23 -070077 return 0;
78 }
79
80 // No locking is provided as these variables are initialized in the constructor
81 // and the members referenced are thread-safe objects.
82 // (mFuture.wait() can block multiple threads.)
83 // Note the order of member variables is reversed for destructor.
84 const std::function<void()> mF;
85 // Used in join() to block until the thread completes.
86 // See https://en.cppreference.com/w/cpp/thread/promise for the void specialization of
87 // promise.
88 std::promise<void> mPromise;
89 std::future<void> mFuture{mPromise.get_future()};
90 std::atomic_bool mIsClosed = false;
91};
92
93/**
94 * The ThreadPool manages thread lifetimes of SoundPool worker threads.
95 *
96 * TODO: the (eventual) goal of ThreadPool is to transparently and cooperatively
97 * maximize CPU utilization while avoiding starvation of other applications.
98 * Some possibilities:
99 *
100 * We should create worker threads when we have SoundPool work and the system is idle.
101 * CPU cycles are "use-it-or-lose-it" when the system is idle.
102 *
103 * We should adjust the priority of worker threads so that the second (and subsequent) worker
104 * threads have lower priority (should we try to promote priority also?).
105 *
106 * We should throttle the spawning of new worker threads, spacing over time, to avoid
107 * creating too many new threads all at once, on initialization.
108 */
109class ThreadPool {
110public:
111 ThreadPool(size_t maxThreadCount, std::string name)
112 : mMaxThreadCount(maxThreadCount)
113 , mName{std::move(name)} { }
114
115 ~ThreadPool() { quit(); }
116
117 size_t getActiveThreadCount() const { return mActiveThreadCount; }
118 size_t getMaxThreadCount() const { return mMaxThreadCount; }
119
120 void quit() {
121 std::list<std::unique_ptr<JavaThread>> threads;
122 {
123 std::lock_guard lock(mThreadLock);
124 if (mQuit) return; // already joined.
125 mQuit = true;
126 threads = std::move(mThreads);
127 mThreads.clear();
128 }
129 // mQuit set under lock, no more threads will be created.
130 for (auto &thread : threads) {
131 thread->join();
132 thread.reset();
133 }
134 LOG_ALWAYS_FATAL_IF(mActiveThreadCount != 0,
135 "Invalid Active Threads: %zu", (size_t)mActiveThreadCount);
136 }
137
138 // returns a non-zero id if successful, the id is to help logging messages.
139 int32_t launch(std::function<void(int32_t /* id */)> f) {
140 std::list<std::unique_ptr<JavaThread>> threadsToRelease; // release outside of lock.
141 std::lock_guard lock(mThreadLock);
142 if (mQuit) return 0; // ignore if we have quit
143
144 // clean up threads.
145 for (auto it = mThreads.begin(); it != mThreads.end(); ) {
146 if ((*it)->isClosed()) {
147 threadsToRelease.emplace_back(std::move(*it));
148 it = mThreads.erase(it);
149 } else {
150 ++it;
151 }
152 }
153
154 const size_t threadCount = mThreads.size();
155 if (threadCount < mMaxThreadCount) {
156 // if the id wraps, we don't care about collisions. it's just for logging.
157 mNextThreadId = mNextThreadId == INT32_MAX ? 1 : ++mNextThreadId;
158 const int32_t id = mNextThreadId;
159 mThreads.emplace_back(std::make_unique<JavaThread>(
160 [this, id, mf = std::move(f)] { mf(id); --mActiveThreadCount; },
161 (mName + std::to_string(id)).c_str()));
162 ++mActiveThreadCount;
163 return id;
164 }
165 return 0;
166 }
167
168 // TODO: launch only if load average is low.
169 // This gets the load average
170 // See also std::thread::hardware_concurrency() for the concurrent capability.
171 static double getLoadAvg() {
172 double loadAvg[1];
173 if (getloadavg(loadAvg, std::size(loadAvg)) > 0) {
174 return loadAvg[0];
175 }
176 return -1.;
177 }
178
179private:
180 const size_t mMaxThreadCount;
181 const std::string mName;
182
183 std::atomic_size_t mActiveThreadCount = 0;
184
185 std::mutex mThreadLock;
186 bool mQuit = false; // GUARDED_BY(mThreadLock)
187 int32_t mNextThreadId = 0; // GUARDED_BY(mThreadLock)
188 std::list<std::unique_ptr<JavaThread>> mThreads; // GUARDED_BY(mThreadLock)
189};
190
191/**
192 * A Perfect HashTable for IDs (key) to pointers (value).
193 *
194 * There are no collisions. Why? because we generate the IDs for you to look up :-).
195 *
196 * The goal of this hash table is to map an integer ID handle > 0 to a pointer.
197 * We give these IDs in monotonic order (though we may skip if it were to cause a collision).
198 *
199 * The size of the hashtable must be large enough to accommodate the max number of keys.
200 * We suggest 2x.
201 *
202 * Readers are lockless
203 * Single writer could be lockless, but we allow multiple writers through an internal lock.
204 *
205 * For the Key type K, valid keys generated are > 0 (signed or unsigned)
206 * For the Value type V, values are pointers - nullptr means empty.
207 */
208template <typename K, typename V>
209class PerfectHash {
210public:
211 PerfectHash(size_t hashCapacity)
212 : mHashCapacity(hashCapacity)
213 , mK2V{new std::atomic<V>[hashCapacity]()} {
214 }
215
216 // Generate a key for a value V.
217 // There is a testing function getKforV() which checks what the value reports as its key.
218 //
219 // Calls back into getKforV under lock.
220 //
221 // We expect that the hashCapacity is 2x the number of stored keys in order
222 // to have one or two tries to find an empty slot
223 K generateKey(V value, std::function<K(V)> getKforV, K oldKey = 0) {
224 std::lock_guard lock(mHashLock);
225 // try to remove the old key.
226 if (oldKey > 0) { // key valid
227 const V v = getValue(oldKey);
228 if (v != nullptr) { // value still valid
229 const K atPosition = getKforV(v);
230 if (atPosition < 0 || // invalid value
231 atPosition == oldKey || // value's key still valid and matches old key
232 ((atPosition ^ oldKey) & (mHashCapacity - 1)) != 0) { // stale key entry
233 getValue(oldKey) = nullptr; // invalidate
234 }
235 } // else if value is invalid, no need to invalidate.
236 }
237 // check if we are invalidating only.
238 if (value == nullptr) return 0;
239 // now insert the new value and return the key.
240 size_t tries = 0;
241 for (; tries < mHashCapacity; ++tries) {
242 mNextKey = mNextKey == std::numeric_limits<K>::max() ? 1 : mNextKey + 1;
243 const V v = getValue(mNextKey);
244 //ALOGD("tries: %zu, key:%d value:%p", tries, (int)mNextKey, v);
245 if (v == nullptr) break; // empty
246 const K atPosition = getKforV(v);
247 //ALOGD("tries: %zu key atPosition:%d", tries, (int)atPosition);
248 if (atPosition < 0 || // invalid value
249 ((atPosition ^ mNextKey) & (mHashCapacity - 1)) != 0) { // stale key entry
250 break;
251 }
252 }
253 LOG_ALWAYS_FATAL_IF(tries == mHashCapacity, "hash table overflow!");
254 //ALOGD("%s: found after %zu tries", __func__, tries);
255 getValue(mNextKey) = value;
256 return mNextKey;
257 }
258
259 std::atomic<V> &getValue(K key) { return mK2V[key & (mHashCapacity - 1)]; }
260 const std::atomic_int32_t &getValue(K key) const { return mK2V[key & (mHashCapacity - 1)]; }
261
262private:
263 mutable std::mutex mHashLock;
264 const size_t mHashCapacity; // size of mK2V no lock needed.
265 std::unique_ptr<std::atomic<V>[]> mK2V; // no lock needed for read access.
266 K mNextKey{}; // GUARDED_BY(mHashLock)
267};
268
269/**
270 * StreamMap contains the all the valid streams available to SoundPool.
271 *
272 * There is no Lock required for this class because the streams are
273 * allocated in the constructor, the lookup is lockless, and the Streams
274 * returned are locked internally.
275 *
276 * The lookup uses a perfect hash.
277 * It is possible to use a lockless hash table or to use a stripe-locked concurrent
278 * hashmap for essentially lock-free lookup.
279 *
280 * This follows Map-Reduce parallelism model.
281 * https://en.wikipedia.org/wiki/MapReduce
282 *
283 * Conceivably the forEach could be parallelized using std::for_each with a
284 * std::execution::par policy.
285 *
286 * https://en.cppreference.com/w/cpp/algorithm/for_each
287 */
288class StreamMap {
289public:
290 explicit StreamMap(int32_t streams);
291
292 // Returns the stream associated with streamID or nullptr if not found.
293 // This need not be locked.
294 // The stream ID will never migrate to another Stream, but it may change
295 // underneath you. The Stream operations that take a streamID will confirm
296 // that the streamID matches under the Stream lock before executing otherwise
297 // it ignores the command as stale.
298 Stream* findStream(int32_t streamID) const;
299
300 // Iterates through the stream pool applying the function f.
301 // Since this enumerates over every single stream, it is unlocked.
302 //
303 // See related: https://en.cppreference.com/w/cpp/algorithm/for_each
304 void forEach(std::function<void(const Stream *)>f) const {
305 for (size_t i = 0; i < mStreamPoolSize; ++i) {
306 f(&mStreamPool[i]);
307 }
308 }
309
310 void forEach(std::function<void(Stream *)>f) {
311 for (size_t i = 0; i < mStreamPoolSize; ++i) {
312 f(&mStreamPool[i]);
313 }
314 }
315
316 // Returns the pair stream for a given Stream.
317 // This need not be locked as it is a property of the pointer address.
318 Stream* getPairStream(const Stream* stream) const {
319 const size_t index = streamPosition(stream);
320 return &mStreamPool[index ^ 1];
321 }
322
323 // find the position of the stream in mStreamPool array.
324 size_t streamPosition(const Stream* stream) const; // no lock needed
325
326 size_t getStreamMapSize() const {
327 return mStreamPoolSize;
328 }
329
330 // find the next valid ID for a stream and store in hash table.
331 int32_t getNextIdForStream(Stream* stream) const;
332
333private:
334
335 // use the hash table to attempt to find the stream.
336 // nullptr is returned if the lookup fails.
337 Stream* lookupStreamFromId(int32_t streamID) const;
338
339 // The stream pool is initialized in the constructor, effectively const.
340 // no locking required for access.
341 //
342 // The constructor parameter "streams" results in streams pairs of streams.
343 // We have twice as many streams because we wish to return a streamID "handle"
344 // back to the app immediately, while we may be stopping the other stream in the
345 // pair to get its AudioTrack :-).
346 //
347 // Of the stream pair, only one of the streams may have an AudioTrack.
348 // The fixed association of a stream pair allows callbacks from the AudioTrack
349 // to be associated properly to either one or the other of the stream pair.
350 //
351 // TODO: The stream pair arrangement can be removed if we have better AudioTrack
352 // callback handling (being able to remove and change the callback after construction).
353 //
354 // Streams may be accessed anytime off of the stream pool
355 // as there is internal locking on each stream.
356 std::unique_ptr<Stream[]> mStreamPool; // no lock needed for access.
357 size_t mStreamPoolSize; // no lock needed for access.
358
359 // In order to find the Stream from a StreamID, we could do a linear lookup in mStreamPool.
360 // As an alternative, one could use stripe-locked or lock-free concurrent hashtables.
361 //
362 // When considering linear search vs hashmap, verify the typical use-case size.
363 // Linear search is faster than std::unordered_map (circa 2018) for less than 40 elements.
364 // [ Skarupke, M. (2018), "You Can Do Better than std::unordered_map: New and Recent
365 // Improvements to Hash Table Performance." C++Now 2018. cppnow.org, see
366 // https://www.youtube.com/watch?v=M2fKMP47slQ ]
367 //
368 // Here, we use a PerfectHash of Id to Stream *, since we can control the
369 // StreamID returned to the user. This allows O(1) read access to mStreamPool lock-free.
370 //
371 // We prefer that the next stream ID is monotonic for aesthetic reasons
372 // (if we didn't care about monotonicity, a simple method is to apply a generation count
373 // to each stream in the unused upper bits of its index in mStreamPool for the id).
374 //
375 std::unique_ptr<PerfectHash<int32_t, Stream *>> mPerfectHash;
376};
377
378/**
379 * StreamManager is used to manage the streams (accessed by StreamID from Java).
380 *
381 * Locking order (proceeds from application to component).
382 * SoundPool mApiLock (if needed) -> StreamManager mStreamManagerLock
383 * -> pair Stream mLock -> queued Stream mLock
384 */
385class StreamManager : public StreamMap {
386public:
387 // Note: the SoundPool pointer is only used for stream initialization.
388 // It is not stored in StreamManager.
389 StreamManager(int32_t streams, size_t threads, const audio_attributes_t* attributes);
390 ~StreamManager();
391
392 // Returns positive streamID on success, 0 on failure. This is locked.
393 int32_t queueForPlay(const std::shared_ptr<Sound> &sound,
394 int32_t soundID, float leftVolume, float rightVolume,
395 int32_t priority, int32_t loop, float rate);
396
397 ///////////////////////////////////////////////////////////////////////
398 // Called from soundpool::Stream
399
400 const audio_attributes_t* getAttributes() const { return &mAttributes; }
401
402 // Moves the stream to the restart queue (called upon BUFFER_END of the static track)
403 // this is locked internally.
404 // If activeStreamIDToMatch is nonzero, it will only move to the restart queue
405 // if the streamIDToMatch is found on the active queue.
406 void moveToRestartQueue(Stream* stream, int32_t activeStreamIDToMatch = 0);
407
408private:
409
410 void run(int32_t id); // worker thread, takes lock internally.
411 void dump() const; // no lock needed
412
413 // returns true if more worker threads are needed.
414 bool needMoreThreads_l() {
415 return mRestartStreams.size() > 0 &&
416 (mThreadPool->getActiveThreadCount() == 0
417 || std::distance(mRestartStreams.begin(),
418 mRestartStreams.upper_bound(systemTime()))
419 > (ptrdiff_t)mThreadPool->getActiveThreadCount());
420 }
421
422 // returns true if the stream was added.
423 bool moveToRestartQueue_l(Stream* stream, int32_t activeStreamIDToMatch = 0);
424 // returns number of queues the stream was removed from (should be 0 or 1);
425 // a special code of -1 is returned if activeStreamIDToMatch is > 0 and
426 // the stream wasn't found on the active queue.
427 ssize_t removeFromQueues_l(Stream* stream, int32_t activeStreamIDToMatch = 0);
428 void addToRestartQueue_l(Stream *stream);
429 void addToActiveQueue_l(Stream *stream);
430 void sanityCheckQueue_l() const;
431
432 const audio_attributes_t mAttributes;
433 std::unique_ptr<ThreadPool> mThreadPool; // locked internally
434
435 // mStreamManagerLock is used to lock access for transitions between the
436 // 4 stream queues by the Manager Thread or by the user initiated play().
437 // A stream pair has exactly one stream on exactly one of the queues.
438 std::mutex mStreamManagerLock;
439 std::condition_variable mStreamManagerCondition;
440
441 bool mQuit = false; // GUARDED_BY(mStreamManagerLock)
442
443 // There are constructor arg "streams" pairs of streams, only one of each
444 // pair on the 4 stream queues below. The other stream in the pair serves as
445 // placeholder to accumulate user changes, pending actual availability of the
446 // AudioTrack, as it may be in use, requiring stop-then-restart.
447 //
448 // The 4 queues are implemented in the appropriate STL container based on perceived
449 // optimality.
450
451 // 1) mRestartStreams: Streams awaiting stop.
452 // The paired stream may be active (but with no AudioTrack), and will be restarted
453 // with an active AudioTrack when the current stream is stopped.
454 std::multimap<int64_t /* stopTimeNs */, Stream*>
455 mRestartStreams; // GUARDED_BY(mStreamManagerLock)
456
457 // 2) mActiveStreams: Streams that are active.
458 // The paired stream will be inactive.
459 // This is in order of specified by kStealActiveStream_OldestFirst
460 std::list<Stream*> mActiveStreams; // GUARDED_BY(mStreamManagerLock)
461
462 // 3) mAvailableStreams: Streams that are inactive.
463 // The paired stream will also be inactive.
464 // No particular order.
465 std::unordered_set<Stream*> mAvailableStreams; // GUARDED_BY(mStreamManagerLock)
466
467 // 4) mProcessingStreams: Streams that are being processed by the ManagerThreads
468 // When on this queue, the stream and its pair are not available for stealing.
469 // Each ManagerThread will have at most one stream on the mProcessingStreams queue.
470 // The paired stream may be active or restarting.
471 // No particular order.
472 std::unordered_set<Stream*> mProcessingStreams; // GUARDED_BY(mStreamManagerLock)
473};
474
475} // namespace android::soundpool