Garfield Tan | a354983 | 2018-09-24 15:22:18 -0700 | [diff] [blame] | 1 | /* |
| 2 | * Copyright (C) 2018 The Android Open Source Project |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
Wale Ogunwale | 5950709 | 2018-10-29 09:00:30 -0700 | [diff] [blame] | 17 | package com.android.server.wm; |
Garfield Tan | a354983 | 2018-09-24 15:22:18 -0700 | [diff] [blame] | 18 | |
| 19 | import android.os.Process; |
| 20 | import android.os.SystemClock; |
| 21 | import android.util.Slog; |
| 22 | |
| 23 | import com.android.internal.annotations.VisibleForTesting; |
| 24 | |
| 25 | import java.util.ArrayList; |
| 26 | import java.util.function.Predicate; |
| 27 | |
| 28 | /** |
| 29 | * The common threading logic for persisters to use so that they can run in the same threads. |
| 30 | * Methods in this class are synchronized on its instance, so caller could also synchronize on |
| 31 | * its instance to perform modifications in items. |
| 32 | */ |
| 33 | class PersisterQueue { |
| 34 | private static final String TAG = "PersisterQueue"; |
| 35 | private static final boolean DEBUG = false; |
| 36 | |
| 37 | /** When not flushing don't write out files faster than this */ |
| 38 | private static final long INTER_WRITE_DELAY_MS = 500; |
| 39 | |
| 40 | /** |
| 41 | * When not flushing delay this long before writing the first file out. This gives the next task |
| 42 | * being launched a chance to load its resources without this occupying IO bandwidth. |
| 43 | */ |
| 44 | private static final long PRE_TASK_DELAY_MS = 3000; |
| 45 | |
| 46 | /** The maximum number of entries to keep in the queue before draining it automatically. */ |
| 47 | private static final int MAX_WRITE_QUEUE_LENGTH = 6; |
| 48 | |
| 49 | /** Special value for mWriteTime to mean don't wait, just write */ |
| 50 | private static final long FLUSH_QUEUE = -1; |
| 51 | |
| 52 | /** An {@link WriteQueueItem} that doesn't do anything. Used to trigger {@link |
| 53 | * Listener#onPreProcessItem}. */ |
| 54 | static final WriteQueueItem EMPTY_ITEM = () -> { }; |
| 55 | |
| 56 | private final long mInterWriteDelayMs; |
| 57 | private final long mPreTaskDelayMs; |
| 58 | private final LazyTaskWriterThread mLazyTaskWriterThread; |
| 59 | private final ArrayList<WriteQueueItem> mWriteQueue = new ArrayList<>(); |
| 60 | |
| 61 | private final ArrayList<Listener> mListeners = new ArrayList<>(); |
| 62 | |
| 63 | /** |
| 64 | * Value determines write delay mode as follows: < 0 We are Flushing. No delays between writes |
| 65 | * until the image queue is drained and all tasks needing persisting are written to disk. There |
| 66 | * is no delay between writes. == 0 We are Idle. Next writes will be delayed by |
| 67 | * #PRE_TASK_DELAY_MS. > 0 We are Actively writing. Next write will be at this time. Subsequent |
| 68 | * writes will be delayed by #INTER_WRITE_DELAY_MS. |
| 69 | */ |
| 70 | private long mNextWriteTime = 0; |
| 71 | |
| 72 | PersisterQueue() { |
| 73 | this(INTER_WRITE_DELAY_MS, PRE_TASK_DELAY_MS); |
| 74 | } |
| 75 | |
| 76 | /** Used for tests to reduce waiting time. */ |
| 77 | @VisibleForTesting |
| 78 | PersisterQueue(long interWriteDelayMs, long preTaskDelayMs) { |
| 79 | if (interWriteDelayMs < 0 || preTaskDelayMs < 0) { |
| 80 | throw new IllegalArgumentException("Both inter-write delay and pre-task delay need to" |
| 81 | + "be non-negative. inter-write delay: " + interWriteDelayMs |
| 82 | + "ms pre-task delay: " + preTaskDelayMs); |
| 83 | } |
| 84 | mInterWriteDelayMs = interWriteDelayMs; |
| 85 | mPreTaskDelayMs = preTaskDelayMs; |
| 86 | mLazyTaskWriterThread = new LazyTaskWriterThread("LazyTaskWriterThread"); |
| 87 | } |
| 88 | |
| 89 | synchronized void startPersisting() { |
| 90 | if (!mLazyTaskWriterThread.isAlive()) { |
| 91 | mLazyTaskWriterThread.start(); |
| 92 | } |
| 93 | } |
| 94 | |
| 95 | /** Stops persisting thread. Should only be used in tests. */ |
| 96 | @VisibleForTesting |
| 97 | void stopPersisting() throws InterruptedException { |
| 98 | if (!mLazyTaskWriterThread.isAlive()) { |
| 99 | return; |
| 100 | } |
| 101 | |
| 102 | synchronized (this) { |
| 103 | mLazyTaskWriterThread.interrupt(); |
| 104 | } |
| 105 | mLazyTaskWriterThread.join(); |
| 106 | } |
| 107 | |
| 108 | synchronized void addItem(WriteQueueItem item, boolean flush) { |
| 109 | mWriteQueue.add(item); |
| 110 | |
| 111 | if (flush || mWriteQueue.size() > MAX_WRITE_QUEUE_LENGTH) { |
| 112 | mNextWriteTime = FLUSH_QUEUE; |
| 113 | } else if (mNextWriteTime == 0) { |
| 114 | mNextWriteTime = SystemClock.uptimeMillis() + mPreTaskDelayMs; |
| 115 | } |
| 116 | notify(); |
| 117 | } |
| 118 | |
| 119 | synchronized <T extends WriteQueueItem> T findLastItem(Predicate<T> predicate, Class<T> clazz) { |
| 120 | for (int i = mWriteQueue.size() - 1; i >= 0; --i) { |
| 121 | WriteQueueItem writeQueueItem = mWriteQueue.get(i); |
| 122 | if (clazz.isInstance(writeQueueItem)) { |
| 123 | T item = clazz.cast(writeQueueItem); |
| 124 | if (predicate.test(item)) { |
| 125 | return item; |
| 126 | } |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | return null; |
| 131 | } |
| 132 | |
Garfield Tan | 891146c | 2018-10-09 12:14:00 -0700 | [diff] [blame] | 133 | /** |
Garfield Tan | 0d407f4 | 2019-03-07 11:47:01 -0800 | [diff] [blame] | 134 | * Updates the last item found in the queue that matches the given item, or adds it to the end |
| 135 | * of the queue if no such item is found. |
Garfield Tan | 891146c | 2018-10-09 12:14:00 -0700 | [diff] [blame] | 136 | */ |
| 137 | synchronized <T extends WriteQueueItem> void updateLastOrAddItem(T item, boolean flush) { |
| 138 | final T itemToUpdate = findLastItem(item::matches, (Class<T>) item.getClass()); |
| 139 | if (itemToUpdate == null) { |
| 140 | addItem(item, flush); |
| 141 | } else { |
| 142 | itemToUpdate.updateFrom(item); |
| 143 | } |
| 144 | |
| 145 | yieldIfQueueTooDeep(); |
| 146 | } |
| 147 | |
| 148 | /** |
| 149 | * Removes all items with which given predicate returns {@code true}. |
Garfield Tan | 891146c | 2018-10-09 12:14:00 -0700 | [diff] [blame] | 150 | */ |
Garfield Tan | a354983 | 2018-09-24 15:22:18 -0700 | [diff] [blame] | 151 | synchronized <T extends WriteQueueItem> void removeItems(Predicate<T> predicate, |
| 152 | Class<T> clazz) { |
| 153 | for (int i = mWriteQueue.size() - 1; i >= 0; --i) { |
| 154 | WriteQueueItem writeQueueItem = mWriteQueue.get(i); |
| 155 | if (clazz.isInstance(writeQueueItem)) { |
| 156 | T item = clazz.cast(writeQueueItem); |
| 157 | if (predicate.test(item)) { |
| 158 | if (DEBUG) Slog.d(TAG, "Removing " + item + " from write queue."); |
| 159 | mWriteQueue.remove(i); |
| 160 | } |
| 161 | } |
| 162 | } |
| 163 | } |
| 164 | |
| 165 | synchronized void flush() { |
| 166 | mNextWriteTime = FLUSH_QUEUE; |
| 167 | notifyAll(); |
| 168 | do { |
| 169 | try { |
| 170 | wait(); |
| 171 | } catch (InterruptedException e) { |
| 172 | } |
| 173 | } while (mNextWriteTime == FLUSH_QUEUE); |
| 174 | } |
| 175 | |
| 176 | void yieldIfQueueTooDeep() { |
| 177 | boolean stall = false; |
| 178 | synchronized (this) { |
| 179 | if (mNextWriteTime == FLUSH_QUEUE) { |
| 180 | stall = true; |
| 181 | } |
| 182 | } |
| 183 | if (stall) { |
| 184 | Thread.yield(); |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | void addListener(Listener listener) { |
| 189 | mListeners.add(listener); |
| 190 | } |
| 191 | |
Tadashi G. Takaoka | 74ccec2 | 2018-10-23 11:07:13 +0900 | [diff] [blame] | 192 | @VisibleForTesting |
| 193 | boolean removeListener(Listener listener) { |
| 194 | return mListeners.remove(listener); |
| 195 | } |
| 196 | |
Garfield Tan | a354983 | 2018-09-24 15:22:18 -0700 | [diff] [blame] | 197 | private void processNextItem() throws InterruptedException { |
| 198 | // This part is extracted into a method so that the GC can clearly see the end of the |
| 199 | // scope of the variable 'item'. If this part was in the loop in LazyTaskWriterThread, the |
| 200 | // last item it processed would always "leak". |
| 201 | // See https://b.corp.google.com/issues/64438652#comment7 |
| 202 | |
| 203 | // If mNextWriteTime, then don't delay between each call to saveToXml(). |
| 204 | final WriteQueueItem item; |
| 205 | synchronized (this) { |
| 206 | if (mNextWriteTime != FLUSH_QUEUE) { |
| 207 | // The next write we don't have to wait so long. |
| 208 | mNextWriteTime = SystemClock.uptimeMillis() + mInterWriteDelayMs; |
| 209 | if (DEBUG) { |
| 210 | Slog.d(TAG, "Next write time may be in " + mInterWriteDelayMs |
| 211 | + " msec. (" + mNextWriteTime + ")"); |
| 212 | } |
| 213 | } |
| 214 | |
| 215 | while (mWriteQueue.isEmpty()) { |
| 216 | if (mNextWriteTime != 0) { |
| 217 | mNextWriteTime = 0; // idle. |
| 218 | notify(); // May need to wake up flush(). |
| 219 | } |
| 220 | // Make sure we exit this thread correctly when interrupted before going to |
| 221 | // indefinite wait. |
| 222 | if (Thread.currentThread().isInterrupted()) { |
| 223 | throw new InterruptedException(); |
| 224 | } |
| 225 | if (DEBUG) Slog.d(TAG, "LazyTaskWriter: waiting indefinitely."); |
| 226 | wait(); |
| 227 | // Invariant: mNextWriteTime is either FLUSH_QUEUE or PRE_WRITE_DELAY_MS |
| 228 | // from now. |
| 229 | } |
| 230 | item = mWriteQueue.remove(0); |
| 231 | |
| 232 | long now = SystemClock.uptimeMillis(); |
| 233 | if (DEBUG) { |
| 234 | Slog.d(TAG, "LazyTaskWriter: now=" + now + " mNextWriteTime=" + mNextWriteTime |
| 235 | + " mWriteQueue.size=" + mWriteQueue.size()); |
| 236 | } |
| 237 | while (now < mNextWriteTime) { |
| 238 | if (DEBUG) { |
| 239 | Slog.d(TAG, "LazyTaskWriter: waiting " + (mNextWriteTime - now)); |
| 240 | } |
| 241 | wait(mNextWriteTime - now); |
| 242 | now = SystemClock.uptimeMillis(); |
| 243 | } |
| 244 | |
| 245 | // Got something to do. |
| 246 | } |
| 247 | |
| 248 | item.process(); |
| 249 | } |
| 250 | |
Garfield Tan | 891146c | 2018-10-09 12:14:00 -0700 | [diff] [blame] | 251 | interface WriteQueueItem<T extends WriteQueueItem<T>> { |
Garfield Tan | a354983 | 2018-09-24 15:22:18 -0700 | [diff] [blame] | 252 | void process(); |
Garfield Tan | 891146c | 2018-10-09 12:14:00 -0700 | [diff] [blame] | 253 | |
| 254 | default void updateFrom(T item) {} |
| 255 | |
| 256 | default boolean matches(T item) { |
| 257 | return false; |
| 258 | } |
Garfield Tan | a354983 | 2018-09-24 15:22:18 -0700 | [diff] [blame] | 259 | } |
| 260 | |
| 261 | interface Listener { |
| 262 | /** |
| 263 | * Called before {@link PersisterQueue} tries to process next item. |
| 264 | * |
| 265 | * Note if the queue is empty, this callback will be called before the indefinite wait. This |
| 266 | * will be called once when {@link PersisterQueue} starts the internal thread before the |
| 267 | * indefinite wait. |
| 268 | * |
| 269 | * This callback is called w/o locking the instance of {@link PersisterQueue}. |
| 270 | * |
| 271 | * @param queueEmpty {@code true} if the queue is empty, which indicates {@link |
| 272 | * PersisterQueue} is likely to enter indefinite wait; or {@code false} if there is still |
| 273 | * item to process. |
| 274 | */ |
| 275 | void onPreProcessItem(boolean queueEmpty); |
| 276 | } |
| 277 | |
| 278 | private class LazyTaskWriterThread extends Thread { |
| 279 | |
| 280 | private LazyTaskWriterThread(String name) { |
| 281 | super(name); |
| 282 | } |
| 283 | |
| 284 | @Override |
| 285 | public void run() { |
| 286 | Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); |
| 287 | try { |
| 288 | while (true) { |
| 289 | final boolean probablyDone; |
| 290 | synchronized (PersisterQueue.this) { |
| 291 | probablyDone = mWriteQueue.isEmpty(); |
| 292 | } |
| 293 | |
| 294 | for (int i = mListeners.size() - 1; i >= 0; --i) { |
| 295 | mListeners.get(i).onPreProcessItem(probablyDone); |
| 296 | } |
| 297 | |
| 298 | processNextItem(); |
| 299 | } |
| 300 | } catch (InterruptedException e) { |
| 301 | Slog.e(TAG, "Persister thread is exiting. Should never happen in prod, but" |
| 302 | + "it's OK in tests."); |
| 303 | } |
| 304 | } |
| 305 | } |
| 306 | } |