blob: a17ee6581ea67ead25499c9c664775a31456d119 [file] [log] [blame]
Garfield Tana3549832018-09-24 15:22:18 -07001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Wale Ogunwale59507092018-10-29 09:00:30 -070017package com.android.server.wm;
Garfield Tana3549832018-09-24 15:22:18 -070018
19import android.os.Process;
20import android.os.SystemClock;
21import android.util.Slog;
22
23import com.android.internal.annotations.VisibleForTesting;
24
25import java.util.ArrayList;
26import java.util.function.Predicate;
27
28/**
29 * The common threading logic for persisters to use so that they can run in the same threads.
30 * Methods in this class are synchronized on its instance, so caller could also synchronize on
31 * its instance to perform modifications in items.
32 */
33class PersisterQueue {
34 private static final String TAG = "PersisterQueue";
35 private static final boolean DEBUG = false;
36
37 /** When not flushing don't write out files faster than this */
38 private static final long INTER_WRITE_DELAY_MS = 500;
39
40 /**
41 * When not flushing delay this long before writing the first file out. This gives the next task
42 * being launched a chance to load its resources without this occupying IO bandwidth.
43 */
44 private static final long PRE_TASK_DELAY_MS = 3000;
45
46 /** The maximum number of entries to keep in the queue before draining it automatically. */
47 private static final int MAX_WRITE_QUEUE_LENGTH = 6;
48
49 /** Special value for mWriteTime to mean don't wait, just write */
50 private static final long FLUSH_QUEUE = -1;
51
52 /** An {@link WriteQueueItem} that doesn't do anything. Used to trigger {@link
53 * Listener#onPreProcessItem}. */
54 static final WriteQueueItem EMPTY_ITEM = () -> { };
55
56 private final long mInterWriteDelayMs;
57 private final long mPreTaskDelayMs;
58 private final LazyTaskWriterThread mLazyTaskWriterThread;
59 private final ArrayList<WriteQueueItem> mWriteQueue = new ArrayList<>();
60
61 private final ArrayList<Listener> mListeners = new ArrayList<>();
62
63 /**
64 * Value determines write delay mode as follows: < 0 We are Flushing. No delays between writes
65 * until the image queue is drained and all tasks needing persisting are written to disk. There
66 * is no delay between writes. == 0 We are Idle. Next writes will be delayed by
67 * #PRE_TASK_DELAY_MS. > 0 We are Actively writing. Next write will be at this time. Subsequent
68 * writes will be delayed by #INTER_WRITE_DELAY_MS.
69 */
70 private long mNextWriteTime = 0;
71
72 PersisterQueue() {
73 this(INTER_WRITE_DELAY_MS, PRE_TASK_DELAY_MS);
74 }
75
76 /** Used for tests to reduce waiting time. */
77 @VisibleForTesting
78 PersisterQueue(long interWriteDelayMs, long preTaskDelayMs) {
79 if (interWriteDelayMs < 0 || preTaskDelayMs < 0) {
80 throw new IllegalArgumentException("Both inter-write delay and pre-task delay need to"
81 + "be non-negative. inter-write delay: " + interWriteDelayMs
82 + "ms pre-task delay: " + preTaskDelayMs);
83 }
84 mInterWriteDelayMs = interWriteDelayMs;
85 mPreTaskDelayMs = preTaskDelayMs;
86 mLazyTaskWriterThread = new LazyTaskWriterThread("LazyTaskWriterThread");
87 }
88
89 synchronized void startPersisting() {
90 if (!mLazyTaskWriterThread.isAlive()) {
91 mLazyTaskWriterThread.start();
92 }
93 }
94
95 /** Stops persisting thread. Should only be used in tests. */
96 @VisibleForTesting
97 void stopPersisting() throws InterruptedException {
98 if (!mLazyTaskWriterThread.isAlive()) {
99 return;
100 }
101
102 synchronized (this) {
103 mLazyTaskWriterThread.interrupt();
104 }
105 mLazyTaskWriterThread.join();
106 }
107
108 synchronized void addItem(WriteQueueItem item, boolean flush) {
109 mWriteQueue.add(item);
110
111 if (flush || mWriteQueue.size() > MAX_WRITE_QUEUE_LENGTH) {
112 mNextWriteTime = FLUSH_QUEUE;
113 } else if (mNextWriteTime == 0) {
114 mNextWriteTime = SystemClock.uptimeMillis() + mPreTaskDelayMs;
115 }
116 notify();
117 }
118
119 synchronized <T extends WriteQueueItem> T findLastItem(Predicate<T> predicate, Class<T> clazz) {
120 for (int i = mWriteQueue.size() - 1; i >= 0; --i) {
121 WriteQueueItem writeQueueItem = mWriteQueue.get(i);
122 if (clazz.isInstance(writeQueueItem)) {
123 T item = clazz.cast(writeQueueItem);
124 if (predicate.test(item)) {
125 return item;
126 }
127 }
128 }
129
130 return null;
131 }
132
Garfield Tan891146c2018-10-09 12:14:00 -0700133 /**
134 *
135 * @param item
136 * @param flush
137 * @param <T>
138 */
139 synchronized <T extends WriteQueueItem> void updateLastOrAddItem(T item, boolean flush) {
140 final T itemToUpdate = findLastItem(item::matches, (Class<T>) item.getClass());
141 if (itemToUpdate == null) {
142 addItem(item, flush);
143 } else {
144 itemToUpdate.updateFrom(item);
145 }
146
147 yieldIfQueueTooDeep();
148 }
149
150 /**
151 * Removes all items with which given predicate returns {@code true}.
152 *
153 * @param predicate the predicate
154 * @param clazz
155 * @param <T>
156 */
Garfield Tana3549832018-09-24 15:22:18 -0700157 synchronized <T extends WriteQueueItem> void removeItems(Predicate<T> predicate,
158 Class<T> clazz) {
159 for (int i = mWriteQueue.size() - 1; i >= 0; --i) {
160 WriteQueueItem writeQueueItem = mWriteQueue.get(i);
161 if (clazz.isInstance(writeQueueItem)) {
162 T item = clazz.cast(writeQueueItem);
163 if (predicate.test(item)) {
164 if (DEBUG) Slog.d(TAG, "Removing " + item + " from write queue.");
165 mWriteQueue.remove(i);
166 }
167 }
168 }
169 }
170
171 synchronized void flush() {
172 mNextWriteTime = FLUSH_QUEUE;
173 notifyAll();
174 do {
175 try {
176 wait();
177 } catch (InterruptedException e) {
178 }
179 } while (mNextWriteTime == FLUSH_QUEUE);
180 }
181
182 void yieldIfQueueTooDeep() {
183 boolean stall = false;
184 synchronized (this) {
185 if (mNextWriteTime == FLUSH_QUEUE) {
186 stall = true;
187 }
188 }
189 if (stall) {
190 Thread.yield();
191 }
192 }
193
194 void addListener(Listener listener) {
195 mListeners.add(listener);
196 }
197
Tadashi G. Takaoka74ccec22018-10-23 11:07:13 +0900198 @VisibleForTesting
199 boolean removeListener(Listener listener) {
200 return mListeners.remove(listener);
201 }
202
Garfield Tana3549832018-09-24 15:22:18 -0700203 private void processNextItem() throws InterruptedException {
204 // This part is extracted into a method so that the GC can clearly see the end of the
205 // scope of the variable 'item'. If this part was in the loop in LazyTaskWriterThread, the
206 // last item it processed would always "leak".
207 // See https://b.corp.google.com/issues/64438652#comment7
208
209 // If mNextWriteTime, then don't delay between each call to saveToXml().
210 final WriteQueueItem item;
211 synchronized (this) {
212 if (mNextWriteTime != FLUSH_QUEUE) {
213 // The next write we don't have to wait so long.
214 mNextWriteTime = SystemClock.uptimeMillis() + mInterWriteDelayMs;
215 if (DEBUG) {
216 Slog.d(TAG, "Next write time may be in " + mInterWriteDelayMs
217 + " msec. (" + mNextWriteTime + ")");
218 }
219 }
220
221 while (mWriteQueue.isEmpty()) {
222 if (mNextWriteTime != 0) {
223 mNextWriteTime = 0; // idle.
224 notify(); // May need to wake up flush().
225 }
226 // Make sure we exit this thread correctly when interrupted before going to
227 // indefinite wait.
228 if (Thread.currentThread().isInterrupted()) {
229 throw new InterruptedException();
230 }
231 if (DEBUG) Slog.d(TAG, "LazyTaskWriter: waiting indefinitely.");
232 wait();
233 // Invariant: mNextWriteTime is either FLUSH_QUEUE or PRE_WRITE_DELAY_MS
234 // from now.
235 }
236 item = mWriteQueue.remove(0);
237
238 long now = SystemClock.uptimeMillis();
239 if (DEBUG) {
240 Slog.d(TAG, "LazyTaskWriter: now=" + now + " mNextWriteTime=" + mNextWriteTime
241 + " mWriteQueue.size=" + mWriteQueue.size());
242 }
243 while (now < mNextWriteTime) {
244 if (DEBUG) {
245 Slog.d(TAG, "LazyTaskWriter: waiting " + (mNextWriteTime - now));
246 }
247 wait(mNextWriteTime - now);
248 now = SystemClock.uptimeMillis();
249 }
250
251 // Got something to do.
252 }
253
254 item.process();
255 }
256
Garfield Tan891146c2018-10-09 12:14:00 -0700257 interface WriteQueueItem<T extends WriteQueueItem<T>> {
Garfield Tana3549832018-09-24 15:22:18 -0700258 void process();
Garfield Tan891146c2018-10-09 12:14:00 -0700259
260 default void updateFrom(T item) {}
261
262 default boolean matches(T item) {
263 return false;
264 }
Garfield Tana3549832018-09-24 15:22:18 -0700265 }
266
267 interface Listener {
268 /**
269 * Called before {@link PersisterQueue} tries to process next item.
270 *
271 * Note if the queue is empty, this callback will be called before the indefinite wait. This
272 * will be called once when {@link PersisterQueue} starts the internal thread before the
273 * indefinite wait.
274 *
275 * This callback is called w/o locking the instance of {@link PersisterQueue}.
276 *
277 * @param queueEmpty {@code true} if the queue is empty, which indicates {@link
278 * PersisterQueue} is likely to enter indefinite wait; or {@code false} if there is still
279 * item to process.
280 */
281 void onPreProcessItem(boolean queueEmpty);
282 }
283
284 private class LazyTaskWriterThread extends Thread {
285
286 private LazyTaskWriterThread(String name) {
287 super(name);
288 }
289
290 @Override
291 public void run() {
292 Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
293 try {
294 while (true) {
295 final boolean probablyDone;
296 synchronized (PersisterQueue.this) {
297 probablyDone = mWriteQueue.isEmpty();
298 }
299
300 for (int i = mListeners.size() - 1; i >= 0; --i) {
301 mListeners.get(i).onPreProcessItem(probablyDone);
302 }
303
304 processNextItem();
305 }
306 } catch (InterruptedException e) {
307 Slog.e(TAG, "Persister thread is exiting. Should never happen in prod, but"
308 + "it's OK in tests.");
309 }
310 }
311 }
312}