blob: 1cfc7acbb3df561644da04f9122020d49def1220 [file] [log] [blame]
Garfield Tana3549832018-09-24 15:22:18 -07001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Wale Ogunwale59507092018-10-29 09:00:30 -070017package com.android.server.wm;
Garfield Tana3549832018-09-24 15:22:18 -070018
19import android.os.Process;
20import android.os.SystemClock;
21import android.util.Slog;
22
23import com.android.internal.annotations.VisibleForTesting;
24
25import java.util.ArrayList;
26import java.util.function.Predicate;
27
28/**
29 * The common threading logic for persisters to use so that they can run in the same threads.
30 * Methods in this class are synchronized on its instance, so caller could also synchronize on
31 * its instance to perform modifications in items.
32 */
33class PersisterQueue {
34 private static final String TAG = "PersisterQueue";
35 private static final boolean DEBUG = false;
36
37 /** When not flushing don't write out files faster than this */
38 private static final long INTER_WRITE_DELAY_MS = 500;
39
40 /**
41 * When not flushing delay this long before writing the first file out. This gives the next task
42 * being launched a chance to load its resources without this occupying IO bandwidth.
43 */
44 private static final long PRE_TASK_DELAY_MS = 3000;
45
46 /** The maximum number of entries to keep in the queue before draining it automatically. */
47 private static final int MAX_WRITE_QUEUE_LENGTH = 6;
48
49 /** Special value for mWriteTime to mean don't wait, just write */
50 private static final long FLUSH_QUEUE = -1;
51
52 /** An {@link WriteQueueItem} that doesn't do anything. Used to trigger {@link
53 * Listener#onPreProcessItem}. */
54 static final WriteQueueItem EMPTY_ITEM = () -> { };
55
56 private final long mInterWriteDelayMs;
57 private final long mPreTaskDelayMs;
58 private final LazyTaskWriterThread mLazyTaskWriterThread;
59 private final ArrayList<WriteQueueItem> mWriteQueue = new ArrayList<>();
60
61 private final ArrayList<Listener> mListeners = new ArrayList<>();
62
63 /**
64 * Value determines write delay mode as follows: < 0 We are Flushing. No delays between writes
65 * until the image queue is drained and all tasks needing persisting are written to disk. There
66 * is no delay between writes. == 0 We are Idle. Next writes will be delayed by
67 * #PRE_TASK_DELAY_MS. > 0 We are Actively writing. Next write will be at this time. Subsequent
68 * writes will be delayed by #INTER_WRITE_DELAY_MS.
69 */
70 private long mNextWriteTime = 0;
71
72 PersisterQueue() {
73 this(INTER_WRITE_DELAY_MS, PRE_TASK_DELAY_MS);
74 }
75
76 /** Used for tests to reduce waiting time. */
77 @VisibleForTesting
78 PersisterQueue(long interWriteDelayMs, long preTaskDelayMs) {
79 if (interWriteDelayMs < 0 || preTaskDelayMs < 0) {
80 throw new IllegalArgumentException("Both inter-write delay and pre-task delay need to"
81 + "be non-negative. inter-write delay: " + interWriteDelayMs
82 + "ms pre-task delay: " + preTaskDelayMs);
83 }
84 mInterWriteDelayMs = interWriteDelayMs;
85 mPreTaskDelayMs = preTaskDelayMs;
86 mLazyTaskWriterThread = new LazyTaskWriterThread("LazyTaskWriterThread");
87 }
88
89 synchronized void startPersisting() {
90 if (!mLazyTaskWriterThread.isAlive()) {
91 mLazyTaskWriterThread.start();
92 }
93 }
94
95 /** Stops persisting thread. Should only be used in tests. */
96 @VisibleForTesting
97 void stopPersisting() throws InterruptedException {
98 if (!mLazyTaskWriterThread.isAlive()) {
99 return;
100 }
101
102 synchronized (this) {
103 mLazyTaskWriterThread.interrupt();
104 }
105 mLazyTaskWriterThread.join();
106 }
107
108 synchronized void addItem(WriteQueueItem item, boolean flush) {
109 mWriteQueue.add(item);
110
111 if (flush || mWriteQueue.size() > MAX_WRITE_QUEUE_LENGTH) {
112 mNextWriteTime = FLUSH_QUEUE;
113 } else if (mNextWriteTime == 0) {
114 mNextWriteTime = SystemClock.uptimeMillis() + mPreTaskDelayMs;
115 }
116 notify();
117 }
118
119 synchronized <T extends WriteQueueItem> T findLastItem(Predicate<T> predicate, Class<T> clazz) {
120 for (int i = mWriteQueue.size() - 1; i >= 0; --i) {
121 WriteQueueItem writeQueueItem = mWriteQueue.get(i);
122 if (clazz.isInstance(writeQueueItem)) {
123 T item = clazz.cast(writeQueueItem);
124 if (predicate.test(item)) {
125 return item;
126 }
127 }
128 }
129
130 return null;
131 }
132
133 synchronized <T extends WriteQueueItem> void removeItems(Predicate<T> predicate,
134 Class<T> clazz) {
135 for (int i = mWriteQueue.size() - 1; i >= 0; --i) {
136 WriteQueueItem writeQueueItem = mWriteQueue.get(i);
137 if (clazz.isInstance(writeQueueItem)) {
138 T item = clazz.cast(writeQueueItem);
139 if (predicate.test(item)) {
140 if (DEBUG) Slog.d(TAG, "Removing " + item + " from write queue.");
141 mWriteQueue.remove(i);
142 }
143 }
144 }
145 }
146
147 synchronized void flush() {
148 mNextWriteTime = FLUSH_QUEUE;
149 notifyAll();
150 do {
151 try {
152 wait();
153 } catch (InterruptedException e) {
154 }
155 } while (mNextWriteTime == FLUSH_QUEUE);
156 }
157
158 void yieldIfQueueTooDeep() {
159 boolean stall = false;
160 synchronized (this) {
161 if (mNextWriteTime == FLUSH_QUEUE) {
162 stall = true;
163 }
164 }
165 if (stall) {
166 Thread.yield();
167 }
168 }
169
170 void addListener(Listener listener) {
171 mListeners.add(listener);
172 }
173
Tadashi G. Takaoka74ccec22018-10-23 11:07:13 +0900174 @VisibleForTesting
175 boolean removeListener(Listener listener) {
176 return mListeners.remove(listener);
177 }
178
Garfield Tana3549832018-09-24 15:22:18 -0700179 private void processNextItem() throws InterruptedException {
180 // This part is extracted into a method so that the GC can clearly see the end of the
181 // scope of the variable 'item'. If this part was in the loop in LazyTaskWriterThread, the
182 // last item it processed would always "leak".
183 // See https://b.corp.google.com/issues/64438652#comment7
184
185 // If mNextWriteTime, then don't delay between each call to saveToXml().
186 final WriteQueueItem item;
187 synchronized (this) {
188 if (mNextWriteTime != FLUSH_QUEUE) {
189 // The next write we don't have to wait so long.
190 mNextWriteTime = SystemClock.uptimeMillis() + mInterWriteDelayMs;
191 if (DEBUG) {
192 Slog.d(TAG, "Next write time may be in " + mInterWriteDelayMs
193 + " msec. (" + mNextWriteTime + ")");
194 }
195 }
196
197 while (mWriteQueue.isEmpty()) {
198 if (mNextWriteTime != 0) {
199 mNextWriteTime = 0; // idle.
200 notify(); // May need to wake up flush().
201 }
202 // Make sure we exit this thread correctly when interrupted before going to
203 // indefinite wait.
204 if (Thread.currentThread().isInterrupted()) {
205 throw new InterruptedException();
206 }
207 if (DEBUG) Slog.d(TAG, "LazyTaskWriter: waiting indefinitely.");
208 wait();
209 // Invariant: mNextWriteTime is either FLUSH_QUEUE or PRE_WRITE_DELAY_MS
210 // from now.
211 }
212 item = mWriteQueue.remove(0);
213
214 long now = SystemClock.uptimeMillis();
215 if (DEBUG) {
216 Slog.d(TAG, "LazyTaskWriter: now=" + now + " mNextWriteTime=" + mNextWriteTime
217 + " mWriteQueue.size=" + mWriteQueue.size());
218 }
219 while (now < mNextWriteTime) {
220 if (DEBUG) {
221 Slog.d(TAG, "LazyTaskWriter: waiting " + (mNextWriteTime - now));
222 }
223 wait(mNextWriteTime - now);
224 now = SystemClock.uptimeMillis();
225 }
226
227 // Got something to do.
228 }
229
230 item.process();
231 }
232
233 interface WriteQueueItem {
234 void process();
235 }
236
237 interface Listener {
238 /**
239 * Called before {@link PersisterQueue} tries to process next item.
240 *
241 * Note if the queue is empty, this callback will be called before the indefinite wait. This
242 * will be called once when {@link PersisterQueue} starts the internal thread before the
243 * indefinite wait.
244 *
245 * This callback is called w/o locking the instance of {@link PersisterQueue}.
246 *
247 * @param queueEmpty {@code true} if the queue is empty, which indicates {@link
248 * PersisterQueue} is likely to enter indefinite wait; or {@code false} if there is still
249 * item to process.
250 */
251 void onPreProcessItem(boolean queueEmpty);
252 }
253
254 private class LazyTaskWriterThread extends Thread {
255
256 private LazyTaskWriterThread(String name) {
257 super(name);
258 }
259
260 @Override
261 public void run() {
262 Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
263 try {
264 while (true) {
265 final boolean probablyDone;
266 synchronized (PersisterQueue.this) {
267 probablyDone = mWriteQueue.isEmpty();
268 }
269
270 for (int i = mListeners.size() - 1; i >= 0; --i) {
271 mListeners.get(i).onPreProcessItem(probablyDone);
272 }
273
274 processNextItem();
275 }
276 } catch (InterruptedException e) {
277 Slog.e(TAG, "Persister thread is exiting. Should never happen in prod, but"
278 + "it's OK in tests.");
279 }
280 }
281 }
282}