blob: b623bfb08d84fa5977cfd8c632029a3e2b77ab7b [file] [log] [blame]
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -08001/*
2 * Copyright (C) 2011 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.volley;
18
19import android.os.Handler;
20import android.os.Looper;
21
22import java.util.ArrayList;
23import java.util.HashMap;
24import java.util.HashSet;
25import java.util.LinkedList;
26import java.util.List;
27import java.util.Map;
28import java.util.Queue;
29import java.util.Set;
30import java.util.concurrent.PriorityBlockingQueue;
31import java.util.concurrent.atomic.AtomicInteger;
32
33/**
34 * A request dispatch queue with a thread pool of dispatchers.
35 *
36 * Calling {@link #add(Request)} will enqueue the given Request for dispatch,
37 * resolving from either cache or network on a worker thread, and then delivering
38 * a parsed response on the main thread.
39 */
40@SuppressWarnings("rawtypes")
41public class RequestQueue {
42
43 /** Used for generating monotonically-increasing sequence numbers for requests. */
44 private static AtomicInteger sSequenceGenerator = new AtomicInteger();
45
46 /**
47 * Staging area for requests that already have a duplicate request in flight.
48 *
49 * <ul>
50 * <li>containsKey(cacheKey) indicates that there is a request in flight for the given cache
51 * key.</li>
52 * <li>get(cacheKey) returns waiting requests for the given cache key. The in flight request
53 * is <em>not</em> contained in that list. Is null if no requests are staged.</li>
54 * </ul>
55 */
56 private final Map<String, Queue<Request>> mWaitingRequests =
57 new HashMap<String, Queue<Request>>();
58
59 /**
60 * The set of all requests currently being processed by this RequestQueue. A Request
61 * will be in this set if it is waiting in any queue or currently being processed by
62 * any dispatcher.
63 */
64 private final Set<Request> mCurrentRequests = new HashSet<Request>();
65
66 /** The cache triage queue. */
67 private final PriorityBlockingQueue<Request> mCacheQueue =
68 new PriorityBlockingQueue<Request>();
69
70 /** The queue of requests that are actually going out to the network. */
71 private final PriorityBlockingQueue<Request> mNetworkQueue =
72 new PriorityBlockingQueue<Request>();
73
74 /** Number of network request dispatcher threads to start. */
75 private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4;
76
77 /** Cache interface for retrieving and storing respones. */
78 private final Cache mCache;
79
80 /** Network interface for performing requests. */
81 private final Network mNetwork;
82
83 /** Response delivery mechanism. */
84 private final ResponseDelivery mDelivery;
85
86 /** The network dispatchers. */
87 private NetworkDispatcher[] mDispatchers;
88
89 /** The cache dispatcher. */
90 private CacheDispatcher mCacheDispatcher;
91
92 /**
93 * Creates the worker pool. Processing will not begin until {@link #start()} is called.
94 *
95 * @param cache A Cache to use for persisting responses to disk
96 * @param network A Network interface for performing HTTP requests
97 * @param threadPoolSize Number of network dispatcher threads to create
98 * @param delivery A ResponseDelivery interface for posting responses and errors
99 */
100 public RequestQueue(Cache cache, Network network, int threadPoolSize,
101 ResponseDelivery delivery) {
102 mCache = cache;
103 mNetwork = network;
104 mDispatchers = new NetworkDispatcher[threadPoolSize];
105 mDelivery = delivery;
106 }
107
108 /**
109 * Creates the worker pool. Processing will not begin until {@link #start()} is called.
110 *
111 * @param cache A Cache to use for persisting responses to disk
112 * @param network A Network interface for performing HTTP requests
113 * @param threadPoolSize Number of network dispatcher threads to create
114 */
115 public RequestQueue(Cache cache, Network network, int threadPoolSize) {
116 this(cache, network, threadPoolSize,
117 new ExecutorDelivery(new Handler(Looper.getMainLooper())));
118 }
119
120 /**
121 * Creates the worker pool. Processing will not begin until {@link #start()} is called.
122 *
123 * @param cache A Cache to use for persisting responses to disk
124 * @param network A Network interface for performing HTTP requests
125 */
126 public RequestQueue(Cache cache, Network network) {
127 this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE);
128 }
129
130 /**
131 * Starts the dispatchers in this queue.
132 */
133 public void start() {
134 stop(); // Make sure any currently running dispatchers are stopped.
135 // Create the cache dispatcher and start it.
136 mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
137 mCacheDispatcher.start();
138
139 // Create network dispatchers (and corresponding threads) up to the pool size.
140 for (int i = 0; i < mDispatchers.length; i++) {
141 NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork,
142 mCache, mDelivery);
143 mDispatchers[i] = networkDispatcher;
144 networkDispatcher.start();
145 }
146 }
147
148 /**
149 * Stops the cache and network dispatchers.
150 */
151 public void stop() {
152 if (mCacheDispatcher != null) {
153 mCacheDispatcher.quit();
154 }
155 for (int i = 0; i < mDispatchers.length; i++) {
156 if (mDispatchers[i] != null) {
157 mDispatchers[i].quit();
158 }
159 }
160 }
161
162 /**
163 * Gets a sequence number.
164 */
165 public static int getSequenceNumber() {
166 return sSequenceGenerator.incrementAndGet();
167 }
168
169 /**
170 * A simple predicate or filter interface for Requests, for use by
171 * {@link RequestQueue#cancelAll(RequestFilter)}.
172 */
173 public interface RequestFilter {
174 public boolean apply(Request<?> request);
175 }
176
177 /**
178 * Cancels all requests in this queue for which the given filter applies.
179 * @param filter The filtering function to use
180 */
181 public void cancelAll(RequestFilter filter) {
182 synchronized (mCurrentRequests) {
183 for (Request<?> request : mCurrentRequests) {
184 if (filter.apply(request)) {
185 request.cancel();
186 }
187 }
188 }
189 }
190
191 /**
192 * Cancels all requests in this queue with the given tag. Tag must be non-null
193 * and equality is by identity.
194 */
195 public void cancelAll(final Object tag) {
196 if (tag == null) {
197 throw new IllegalArgumentException("Cannot cancelAll with a null tag");
198 }
199 cancelAll(new RequestFilter() {
200 @Override
201 public boolean apply(Request<?> request) {
202 return request.getTag() == tag;
203 }
204 });
205 }
206
207 /**
208 * Drains this request queue. All requests currently being processed
209 * or waiting to be processed will be canceled, and no responses, success
210 * or error, will be delivered for them.
211 */
212 @Deprecated
213 public void drain() {
214 drain(getSequenceNumber());
215 }
216
217 /**
218 * Drains this request queue. All requests currently being processed
219 * or waiting to be processed will be canceled, and no responses, success
220 * or error, will be delivered for them.
221 * @param sequenceNumber The sequence number of the drain request. Everything before this number
222 * will be drained.
223 */
224 @Deprecated
225 public void drain(int sequenceNumber) {
226 // Cancel drainable requests.
227 cancelDrainable(mCacheQueue, sequenceNumber);
228 cancelDrainable(mNetworkQueue, sequenceNumber);
229
230 // Tell the delivery to discard all requests with a sequence number below it. We can't stop
231 // requests that are already in flight, but this will suppress their responses
232 // from being delivered.
233 mDelivery.discardBefore(sequenceNumber);
234 if (VolleyLog.DEBUG) {
235 VolleyLog.v("Draining requests with sequence number below %s", sequenceNumber);
236 }
237 }
238
239 /**
240 * Cancels all requests in the provided queue that return true from
241 * {@link Request#isDrainable()} and have a sequence number less than the
242 * provided one.
243 */
244 @SuppressWarnings("deprecation")
245 private void cancelDrainable(PriorityBlockingQueue<Request> queue, int sequenceNumber) {
246 List<Request> pending = new ArrayList<Request>();
247 // Remove all requests from the queue in order to work on them.
248 queue.drainTo(pending);
249 for (Request request : pending) {
250 if (request.isDrainable() && request.getSequence() < sequenceNumber) {
251 request.cancel();
252 }
253 // Put the request back on the queue. If it is canceled, it
254 // will be discarded by one of the dispatchers or the delivery.
255 queue.add(request);
256 }
257 }
258
259 /**
260 * Adds a Request to the dispatch queue.
261 * @param request The request to service
262 * @return The passed-in request
263 */
264 public Request add(Request request) {
265 // Tag the request as belonging to this queue and add it to the set of current requests.
266 request.setRequestQueue(this);
267 synchronized (mCurrentRequests) {
268 mCurrentRequests.add(request);
269 }
270
271 // Process requests in the order they are added.
272 request.setSequence(sSequenceGenerator.incrementAndGet());
273 request.addMarker("add-to-queue");
274
275 // If the request is uncacheable, skip the cache queue and go straight to the network.
276 if (!request.shouldCache()) {
277 mNetworkQueue.add(request);
278 return request;
279 }
280
281 // Insert request into stage if there's already a request with the same cache key in flight.
282 synchronized (mWaitingRequests) {
283 String cacheKey = request.getCacheKey();
284 if (mWaitingRequests.containsKey(cacheKey)) {
285 // There is already a request in flight. Queue up.
286 Queue<Request> stagedRequests = mWaitingRequests.get(cacheKey);
287 if (stagedRequests == null) {
288 stagedRequests = new LinkedList<Request>();
289 }
290 stagedRequests.add(request);
291 mWaitingRequests.put(cacheKey, stagedRequests);
292 if (VolleyLog.DEBUG) {
293 VolleyLog.v("Request for cacheKey=%s is in flight, putting on hold.", cacheKey);
294 }
295 } else {
296 // Insert 'null' queue for this cacheKey, indicating there is now a request in
297 // flight.
298 mWaitingRequests.put(cacheKey, null);
299 mCacheQueue.add(request);
300 }
301 return request;
302 }
303 }
304
305 /**
306 * Called from {@link Request#finish(String)}, indicating that processing of the given request
307 * has finished.
308 *
309 * <p>Releases waiting requests for <code>request.getCacheKey()</code> if
310 * <code>request.shouldCache()</code>.</p>
311 */
312 void finish(Request request) {
313 // Remove from the set of requests currently being processed.
314 synchronized (mCurrentRequests) {
315 mCurrentRequests.remove(request);
316 }
317
318 if (request.shouldCache()) {
319 synchronized (mWaitingRequests) {
320 String cacheKey = request.getCacheKey();
321 Queue<Request> waitingRequests = mWaitingRequests.remove(cacheKey);
322 if (waitingRequests != null) {
323 if (VolleyLog.DEBUG) {
324 VolleyLog.v("Releasing %d waiting requests for cacheKey=%s.",
325 waitingRequests.size(), cacheKey);
326 }
327 // Process all queued up requests. They won't be considered as in flight, but
328 // that's not a problem as the cache has been primed by 'request'.
329 mCacheQueue.addAll(waitingRequests);
330 }
331 }
332 }
333 }
334}