blob: 9dc3d6a81338b79ad59b37a3b0ba140ae61603f2 [file] [log] [blame]
Garfield Tana3549832018-09-24 15:22:18 -07001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Wale Ogunwale59507092018-10-29 09:00:30 -070017package com.android.server.wm;
Garfield Tana3549832018-09-24 15:22:18 -070018
19import android.os.Process;
20import android.os.SystemClock;
21import android.util.Slog;
22
23import com.android.internal.annotations.VisibleForTesting;
24
25import java.util.ArrayList;
26import java.util.function.Predicate;
27
28/**
29 * The common threading logic for persisters to use so that they can run in the same threads.
30 * Methods in this class are synchronized on its instance, so caller could also synchronize on
31 * its instance to perform modifications in items.
32 */
33class PersisterQueue {
34 private static final String TAG = "PersisterQueue";
35 private static final boolean DEBUG = false;
36
37 /** When not flushing don't write out files faster than this */
38 private static final long INTER_WRITE_DELAY_MS = 500;
39
40 /**
41 * When not flushing delay this long before writing the first file out. This gives the next task
42 * being launched a chance to load its resources without this occupying IO bandwidth.
43 */
44 private static final long PRE_TASK_DELAY_MS = 3000;
45
46 /** The maximum number of entries to keep in the queue before draining it automatically. */
47 private static final int MAX_WRITE_QUEUE_LENGTH = 6;
48
49 /** Special value for mWriteTime to mean don't wait, just write */
50 private static final long FLUSH_QUEUE = -1;
51
52 /** An {@link WriteQueueItem} that doesn't do anything. Used to trigger {@link
53 * Listener#onPreProcessItem}. */
54 static final WriteQueueItem EMPTY_ITEM = () -> { };
55
56 private final long mInterWriteDelayMs;
57 private final long mPreTaskDelayMs;
58 private final LazyTaskWriterThread mLazyTaskWriterThread;
59 private final ArrayList<WriteQueueItem> mWriteQueue = new ArrayList<>();
60
61 private final ArrayList<Listener> mListeners = new ArrayList<>();
62
63 /**
64 * Value determines write delay mode as follows: < 0 We are Flushing. No delays between writes
65 * until the image queue is drained and all tasks needing persisting are written to disk. There
66 * is no delay between writes. == 0 We are Idle. Next writes will be delayed by
67 * #PRE_TASK_DELAY_MS. > 0 We are Actively writing. Next write will be at this time. Subsequent
68 * writes will be delayed by #INTER_WRITE_DELAY_MS.
69 */
70 private long mNextWriteTime = 0;
71
72 PersisterQueue() {
73 this(INTER_WRITE_DELAY_MS, PRE_TASK_DELAY_MS);
74 }
75
76 /** Used for tests to reduce waiting time. */
77 @VisibleForTesting
78 PersisterQueue(long interWriteDelayMs, long preTaskDelayMs) {
79 if (interWriteDelayMs < 0 || preTaskDelayMs < 0) {
80 throw new IllegalArgumentException("Both inter-write delay and pre-task delay need to"
81 + "be non-negative. inter-write delay: " + interWriteDelayMs
82 + "ms pre-task delay: " + preTaskDelayMs);
83 }
84 mInterWriteDelayMs = interWriteDelayMs;
85 mPreTaskDelayMs = preTaskDelayMs;
86 mLazyTaskWriterThread = new LazyTaskWriterThread("LazyTaskWriterThread");
87 }
88
89 synchronized void startPersisting() {
90 if (!mLazyTaskWriterThread.isAlive()) {
91 mLazyTaskWriterThread.start();
92 }
93 }
94
95 /** Stops persisting thread. Should only be used in tests. */
96 @VisibleForTesting
97 void stopPersisting() throws InterruptedException {
98 if (!mLazyTaskWriterThread.isAlive()) {
99 return;
100 }
101
102 synchronized (this) {
103 mLazyTaskWriterThread.interrupt();
104 }
105 mLazyTaskWriterThread.join();
106 }
107
108 synchronized void addItem(WriteQueueItem item, boolean flush) {
109 mWriteQueue.add(item);
110
111 if (flush || mWriteQueue.size() > MAX_WRITE_QUEUE_LENGTH) {
112 mNextWriteTime = FLUSH_QUEUE;
113 } else if (mNextWriteTime == 0) {
114 mNextWriteTime = SystemClock.uptimeMillis() + mPreTaskDelayMs;
115 }
116 notify();
117 }
118
119 synchronized <T extends WriteQueueItem> T findLastItem(Predicate<T> predicate, Class<T> clazz) {
120 for (int i = mWriteQueue.size() - 1; i >= 0; --i) {
121 WriteQueueItem writeQueueItem = mWriteQueue.get(i);
122 if (clazz.isInstance(writeQueueItem)) {
123 T item = clazz.cast(writeQueueItem);
124 if (predicate.test(item)) {
125 return item;
126 }
127 }
128 }
129
130 return null;
131 }
132
Garfield Tan891146c2018-10-09 12:14:00 -0700133 /**
Garfield Tan0d407f42019-03-07 11:47:01 -0800134 * Updates the last item found in the queue that matches the given item, or adds it to the end
135 * of the queue if no such item is found.
Garfield Tan891146c2018-10-09 12:14:00 -0700136 */
137 synchronized <T extends WriteQueueItem> void updateLastOrAddItem(T item, boolean flush) {
138 final T itemToUpdate = findLastItem(item::matches, (Class<T>) item.getClass());
139 if (itemToUpdate == null) {
140 addItem(item, flush);
141 } else {
142 itemToUpdate.updateFrom(item);
143 }
144
145 yieldIfQueueTooDeep();
146 }
147
148 /**
149 * Removes all items with which given predicate returns {@code true}.
Garfield Tan891146c2018-10-09 12:14:00 -0700150 */
Garfield Tana3549832018-09-24 15:22:18 -0700151 synchronized <T extends WriteQueueItem> void removeItems(Predicate<T> predicate,
152 Class<T> clazz) {
153 for (int i = mWriteQueue.size() - 1; i >= 0; --i) {
154 WriteQueueItem writeQueueItem = mWriteQueue.get(i);
155 if (clazz.isInstance(writeQueueItem)) {
156 T item = clazz.cast(writeQueueItem);
157 if (predicate.test(item)) {
158 if (DEBUG) Slog.d(TAG, "Removing " + item + " from write queue.");
159 mWriteQueue.remove(i);
160 }
161 }
162 }
163 }
164
165 synchronized void flush() {
166 mNextWriteTime = FLUSH_QUEUE;
167 notifyAll();
168 do {
169 try {
170 wait();
171 } catch (InterruptedException e) {
172 }
173 } while (mNextWriteTime == FLUSH_QUEUE);
174 }
175
176 void yieldIfQueueTooDeep() {
177 boolean stall = false;
178 synchronized (this) {
179 if (mNextWriteTime == FLUSH_QUEUE) {
180 stall = true;
181 }
182 }
183 if (stall) {
184 Thread.yield();
185 }
186 }
187
188 void addListener(Listener listener) {
189 mListeners.add(listener);
190 }
191
Tadashi G. Takaoka74ccec22018-10-23 11:07:13 +0900192 @VisibleForTesting
193 boolean removeListener(Listener listener) {
194 return mListeners.remove(listener);
195 }
196
Garfield Tana3549832018-09-24 15:22:18 -0700197 private void processNextItem() throws InterruptedException {
198 // This part is extracted into a method so that the GC can clearly see the end of the
199 // scope of the variable 'item'. If this part was in the loop in LazyTaskWriterThread, the
200 // last item it processed would always "leak".
201 // See https://b.corp.google.com/issues/64438652#comment7
202
203 // If mNextWriteTime, then don't delay between each call to saveToXml().
204 final WriteQueueItem item;
205 synchronized (this) {
206 if (mNextWriteTime != FLUSH_QUEUE) {
207 // The next write we don't have to wait so long.
208 mNextWriteTime = SystemClock.uptimeMillis() + mInterWriteDelayMs;
209 if (DEBUG) {
210 Slog.d(TAG, "Next write time may be in " + mInterWriteDelayMs
211 + " msec. (" + mNextWriteTime + ")");
212 }
213 }
214
215 while (mWriteQueue.isEmpty()) {
216 if (mNextWriteTime != 0) {
217 mNextWriteTime = 0; // idle.
218 notify(); // May need to wake up flush().
219 }
220 // Make sure we exit this thread correctly when interrupted before going to
221 // indefinite wait.
222 if (Thread.currentThread().isInterrupted()) {
223 throw new InterruptedException();
224 }
225 if (DEBUG) Slog.d(TAG, "LazyTaskWriter: waiting indefinitely.");
226 wait();
227 // Invariant: mNextWriteTime is either FLUSH_QUEUE or PRE_WRITE_DELAY_MS
228 // from now.
229 }
230 item = mWriteQueue.remove(0);
231
232 long now = SystemClock.uptimeMillis();
233 if (DEBUG) {
234 Slog.d(TAG, "LazyTaskWriter: now=" + now + " mNextWriteTime=" + mNextWriteTime
235 + " mWriteQueue.size=" + mWriteQueue.size());
236 }
237 while (now < mNextWriteTime) {
238 if (DEBUG) {
239 Slog.d(TAG, "LazyTaskWriter: waiting " + (mNextWriteTime - now));
240 }
241 wait(mNextWriteTime - now);
242 now = SystemClock.uptimeMillis();
243 }
244
245 // Got something to do.
246 }
247
248 item.process();
249 }
250
Garfield Tan891146c2018-10-09 12:14:00 -0700251 interface WriteQueueItem<T extends WriteQueueItem<T>> {
Garfield Tana3549832018-09-24 15:22:18 -0700252 void process();
Garfield Tan891146c2018-10-09 12:14:00 -0700253
254 default void updateFrom(T item) {}
255
256 default boolean matches(T item) {
257 return false;
258 }
Garfield Tana3549832018-09-24 15:22:18 -0700259 }
260
261 interface Listener {
262 /**
263 * Called before {@link PersisterQueue} tries to process next item.
264 *
265 * Note if the queue is empty, this callback will be called before the indefinite wait. This
266 * will be called once when {@link PersisterQueue} starts the internal thread before the
267 * indefinite wait.
268 *
269 * This callback is called w/o locking the instance of {@link PersisterQueue}.
270 *
271 * @param queueEmpty {@code true} if the queue is empty, which indicates {@link
272 * PersisterQueue} is likely to enter indefinite wait; or {@code false} if there is still
273 * item to process.
274 */
275 void onPreProcessItem(boolean queueEmpty);
276 }
277
278 private class LazyTaskWriterThread extends Thread {
279
280 private LazyTaskWriterThread(String name) {
281 super(name);
282 }
283
284 @Override
285 public void run() {
286 Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
287 try {
288 while (true) {
289 final boolean probablyDone;
290 synchronized (PersisterQueue.this) {
291 probablyDone = mWriteQueue.isEmpty();
292 }
293
294 for (int i = mListeners.size() - 1; i >= 0; --i) {
295 mListeners.get(i).onPreProcessItem(probablyDone);
296 }
297
298 processNextItem();
299 }
300 } catch (InterruptedException e) {
301 Slog.e(TAG, "Persister thread is exiting. Should never happen in prod, but"
302 + "it's OK in tests.");
303 }
304 }
305 }
306}