blob: bcd86f67265e98a590d7ea29e1d7bceb2b08a8f9 [file] [log] [blame]
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -08001/*
2 * Copyright (C) 2011 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.volley;
18
19import android.os.Handler;
20import android.os.Looper;
21
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -080022import java.util.HashMap;
23import java.util.HashSet;
24import java.util.LinkedList;
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -080025import java.util.Map;
26import java.util.Queue;
27import java.util.Set;
28import java.util.concurrent.PriorityBlockingQueue;
29import java.util.concurrent.atomic.AtomicInteger;
30
31/**
32 * A request dispatch queue with a thread pool of dispatchers.
33 *
34 * Calling {@link #add(Request)} will enqueue the given Request for dispatch,
35 * resolving from either cache or network on a worker thread, and then delivering
36 * a parsed response on the main thread.
37 */
38@SuppressWarnings("rawtypes")
39public class RequestQueue {
40
41 /** Used for generating monotonically-increasing sequence numbers for requests. */
Jean-Baptiste Queru6772bce2012-11-07 07:51:33 -080042 private AtomicInteger mSequenceGenerator = new AtomicInteger();
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -080043
44 /**
45 * Staging area for requests that already have a duplicate request in flight.
46 *
47 * <ul>
48 * <li>containsKey(cacheKey) indicates that there is a request in flight for the given cache
49 * key.</li>
50 * <li>get(cacheKey) returns waiting requests for the given cache key. The in flight request
51 * is <em>not</em> contained in that list. Is null if no requests are staged.</li>
52 * </ul>
53 */
54 private final Map<String, Queue<Request>> mWaitingRequests =
55 new HashMap<String, Queue<Request>>();
56
57 /**
58 * The set of all requests currently being processed by this RequestQueue. A Request
59 * will be in this set if it is waiting in any queue or currently being processed by
60 * any dispatcher.
61 */
62 private final Set<Request> mCurrentRequests = new HashSet<Request>();
63
64 /** The cache triage queue. */
65 private final PriorityBlockingQueue<Request> mCacheQueue =
66 new PriorityBlockingQueue<Request>();
67
68 /** The queue of requests that are actually going out to the network. */
69 private final PriorityBlockingQueue<Request> mNetworkQueue =
70 new PriorityBlockingQueue<Request>();
71
72 /** Number of network request dispatcher threads to start. */
73 private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4;
74
75 /** Cache interface for retrieving and storing respones. */
76 private final Cache mCache;
77
78 /** Network interface for performing requests. */
79 private final Network mNetwork;
80
81 /** Response delivery mechanism. */
82 private final ResponseDelivery mDelivery;
83
84 /** The network dispatchers. */
85 private NetworkDispatcher[] mDispatchers;
86
87 /** The cache dispatcher. */
88 private CacheDispatcher mCacheDispatcher;
89
90 /**
91 * Creates the worker pool. Processing will not begin until {@link #start()} is called.
92 *
93 * @param cache A Cache to use for persisting responses to disk
94 * @param network A Network interface for performing HTTP requests
95 * @param threadPoolSize Number of network dispatcher threads to create
96 * @param delivery A ResponseDelivery interface for posting responses and errors
97 */
98 public RequestQueue(Cache cache, Network network, int threadPoolSize,
99 ResponseDelivery delivery) {
100 mCache = cache;
101 mNetwork = network;
102 mDispatchers = new NetworkDispatcher[threadPoolSize];
103 mDelivery = delivery;
104 }
105
106 /**
107 * Creates the worker pool. Processing will not begin until {@link #start()} is called.
108 *
109 * @param cache A Cache to use for persisting responses to disk
110 * @param network A Network interface for performing HTTP requests
111 * @param threadPoolSize Number of network dispatcher threads to create
112 */
113 public RequestQueue(Cache cache, Network network, int threadPoolSize) {
114 this(cache, network, threadPoolSize,
115 new ExecutorDelivery(new Handler(Looper.getMainLooper())));
116 }
117
118 /**
119 * Creates the worker pool. Processing will not begin until {@link #start()} is called.
120 *
121 * @param cache A Cache to use for persisting responses to disk
122 * @param network A Network interface for performing HTTP requests
123 */
124 public RequestQueue(Cache cache, Network network) {
125 this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE);
126 }
127
128 /**
129 * Starts the dispatchers in this queue.
130 */
131 public void start() {
132 stop(); // Make sure any currently running dispatchers are stopped.
133 // Create the cache dispatcher and start it.
134 mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
135 mCacheDispatcher.start();
136
137 // Create network dispatchers (and corresponding threads) up to the pool size.
138 for (int i = 0; i < mDispatchers.length; i++) {
139 NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork,
140 mCache, mDelivery);
141 mDispatchers[i] = networkDispatcher;
142 networkDispatcher.start();
143 }
144 }
145
146 /**
147 * Stops the cache and network dispatchers.
148 */
149 public void stop() {
150 if (mCacheDispatcher != null) {
151 mCacheDispatcher.quit();
152 }
153 for (int i = 0; i < mDispatchers.length; i++) {
154 if (mDispatchers[i] != null) {
155 mDispatchers[i].quit();
156 }
157 }
158 }
159
160 /**
161 * Gets a sequence number.
162 */
Jean-Baptiste Queru6772bce2012-11-07 07:51:33 -0800163 public int getSequenceNumber() {
164 return mSequenceGenerator.incrementAndGet();
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -0800165 }
166
167 /**
Evan Charlton0acc7932012-11-06 13:58:19 -0800168 * Gets the {@link Cache} instance being used.
169 */
170 public Cache getCache() {
171 return mCache;
172 }
173
174 /**
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -0800175 * A simple predicate or filter interface for Requests, for use by
176 * {@link RequestQueue#cancelAll(RequestFilter)}.
177 */
178 public interface RequestFilter {
179 public boolean apply(Request<?> request);
180 }
181
182 /**
183 * Cancels all requests in this queue for which the given filter applies.
184 * @param filter The filtering function to use
185 */
186 public void cancelAll(RequestFilter filter) {
187 synchronized (mCurrentRequests) {
188 for (Request<?> request : mCurrentRequests) {
189 if (filter.apply(request)) {
190 request.cancel();
191 }
192 }
193 }
194 }
195
196 /**
197 * Cancels all requests in this queue with the given tag. Tag must be non-null
198 * and equality is by identity.
199 */
200 public void cancelAll(final Object tag) {
201 if (tag == null) {
202 throw new IllegalArgumentException("Cannot cancelAll with a null tag");
203 }
204 cancelAll(new RequestFilter() {
205 @Override
206 public boolean apply(Request<?> request) {
207 return request.getTag() == tag;
208 }
209 });
210 }
211
212 /**
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -0800213 * Adds a Request to the dispatch queue.
214 * @param request The request to service
215 * @return The passed-in request
216 */
217 public Request add(Request request) {
218 // Tag the request as belonging to this queue and add it to the set of current requests.
219 request.setRequestQueue(this);
220 synchronized (mCurrentRequests) {
221 mCurrentRequests.add(request);
222 }
223
224 // Process requests in the order they are added.
Jean-Baptiste Queru6772bce2012-11-07 07:51:33 -0800225 request.setSequence(getSequenceNumber());
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -0800226 request.addMarker("add-to-queue");
227
228 // If the request is uncacheable, skip the cache queue and go straight to the network.
229 if (!request.shouldCache()) {
230 mNetworkQueue.add(request);
231 return request;
232 }
233
234 // Insert request into stage if there's already a request with the same cache key in flight.
235 synchronized (mWaitingRequests) {
236 String cacheKey = request.getCacheKey();
237 if (mWaitingRequests.containsKey(cacheKey)) {
238 // There is already a request in flight. Queue up.
239 Queue<Request> stagedRequests = mWaitingRequests.get(cacheKey);
240 if (stagedRequests == null) {
241 stagedRequests = new LinkedList<Request>();
242 }
243 stagedRequests.add(request);
244 mWaitingRequests.put(cacheKey, stagedRequests);
245 if (VolleyLog.DEBUG) {
246 VolleyLog.v("Request for cacheKey=%s is in flight, putting on hold.", cacheKey);
247 }
248 } else {
249 // Insert 'null' queue for this cacheKey, indicating there is now a request in
250 // flight.
251 mWaitingRequests.put(cacheKey, null);
252 mCacheQueue.add(request);
253 }
254 return request;
255 }
256 }
257
258 /**
259 * Called from {@link Request#finish(String)}, indicating that processing of the given request
260 * has finished.
261 *
262 * <p>Releases waiting requests for <code>request.getCacheKey()</code> if
263 * <code>request.shouldCache()</code>.</p>
264 */
265 void finish(Request request) {
266 // Remove from the set of requests currently being processed.
267 synchronized (mCurrentRequests) {
268 mCurrentRequests.remove(request);
269 }
270
271 if (request.shouldCache()) {
272 synchronized (mWaitingRequests) {
273 String cacheKey = request.getCacheKey();
274 Queue<Request> waitingRequests = mWaitingRequests.remove(cacheKey);
275 if (waitingRequests != null) {
276 if (VolleyLog.DEBUG) {
277 VolleyLog.v("Releasing %d waiting requests for cacheKey=%s.",
278 waitingRequests.size(), cacheKey);
279 }
280 // Process all queued up requests. They won't be considered as in flight, but
281 // that's not a problem as the cache has been primed by 'request'.
282 mCacheQueue.addAll(waitingRequests);
283 }
284 }
285 }
286 }
287}