blob: 5c0e7afb016f1e1a704f110c312b62c85db01841 [file] [log] [blame]
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -08001/*
2 * Copyright (C) 2011 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.volley;
18
19import android.os.Handler;
20import android.os.Looper;
21
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -080022import java.util.HashMap;
23import java.util.HashSet;
24import java.util.LinkedList;
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -080025import java.util.Map;
26import java.util.Queue;
27import java.util.Set;
28import java.util.concurrent.PriorityBlockingQueue;
29import java.util.concurrent.atomic.AtomicInteger;
30
31/**
32 * A request dispatch queue with a thread pool of dispatchers.
33 *
34 * Calling {@link #add(Request)} will enqueue the given Request for dispatch,
35 * resolving from either cache or network on a worker thread, and then delivering
36 * a parsed response on the main thread.
37 */
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -080038public class RequestQueue {
39
40 /** Used for generating monotonically-increasing sequence numbers for requests. */
Jean-Baptiste Queru6772bce2012-11-07 07:51:33 -080041 private AtomicInteger mSequenceGenerator = new AtomicInteger();
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -080042
43 /**
44 * Staging area for requests that already have a duplicate request in flight.
45 *
46 * <ul>
47 * <li>containsKey(cacheKey) indicates that there is a request in flight for the given cache
48 * key.</li>
49 * <li>get(cacheKey) returns waiting requests for the given cache key. The in flight request
50 * is <em>not</em> contained in that list. Is null if no requests are staged.</li>
51 * </ul>
52 */
Ficus Kirkpatrick35d5cc32014-02-08 11:15:04 -080053 private final Map<String, Queue<Request<?>>> mWaitingRequests =
54 new HashMap<String, Queue<Request<?>>>();
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -080055
56 /**
57 * The set of all requests currently being processed by this RequestQueue. A Request
58 * will be in this set if it is waiting in any queue or currently being processed by
59 * any dispatcher.
60 */
Ficus Kirkpatrick35d5cc32014-02-08 11:15:04 -080061 private final Set<Request<?>> mCurrentRequests = new HashSet<Request<?>>();
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -080062
63 /** The cache triage queue. */
Ficus Kirkpatrick35d5cc32014-02-08 11:15:04 -080064 private final PriorityBlockingQueue<Request<?>> mCacheQueue =
65 new PriorityBlockingQueue<Request<?>>();
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -080066
67 /** The queue of requests that are actually going out to the network. */
Ficus Kirkpatrick35d5cc32014-02-08 11:15:04 -080068 private final PriorityBlockingQueue<Request<?>> mNetworkQueue =
69 new PriorityBlockingQueue<Request<?>>();
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -080070
71 /** Number of network request dispatcher threads to start. */
72 private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4;
73
Ficus Kirkpatrick35d5cc32014-02-08 11:15:04 -080074 /** Cache interface for retrieving and storing responses. */
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -080075 private final Cache mCache;
76
77 /** Network interface for performing requests. */
78 private final Network mNetwork;
79
80 /** Response delivery mechanism. */
81 private final ResponseDelivery mDelivery;
82
83 /** The network dispatchers. */
84 private NetworkDispatcher[] mDispatchers;
85
86 /** The cache dispatcher. */
87 private CacheDispatcher mCacheDispatcher;
88
89 /**
90 * Creates the worker pool. Processing will not begin until {@link #start()} is called.
91 *
92 * @param cache A Cache to use for persisting responses to disk
93 * @param network A Network interface for performing HTTP requests
94 * @param threadPoolSize Number of network dispatcher threads to create
95 * @param delivery A ResponseDelivery interface for posting responses and errors
96 */
97 public RequestQueue(Cache cache, Network network, int threadPoolSize,
98 ResponseDelivery delivery) {
99 mCache = cache;
100 mNetwork = network;
101 mDispatchers = new NetworkDispatcher[threadPoolSize];
102 mDelivery = delivery;
103 }
104
105 /**
106 * Creates the worker pool. Processing will not begin until {@link #start()} is called.
107 *
108 * @param cache A Cache to use for persisting responses to disk
109 * @param network A Network interface for performing HTTP requests
110 * @param threadPoolSize Number of network dispatcher threads to create
111 */
112 public RequestQueue(Cache cache, Network network, int threadPoolSize) {
113 this(cache, network, threadPoolSize,
114 new ExecutorDelivery(new Handler(Looper.getMainLooper())));
115 }
116
117 /**
118 * Creates the worker pool. Processing will not begin until {@link #start()} is called.
119 *
120 * @param cache A Cache to use for persisting responses to disk
121 * @param network A Network interface for performing HTTP requests
122 */
123 public RequestQueue(Cache cache, Network network) {
124 this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE);
125 }
126
127 /**
128 * Starts the dispatchers in this queue.
129 */
130 public void start() {
131 stop(); // Make sure any currently running dispatchers are stopped.
132 // Create the cache dispatcher and start it.
133 mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
134 mCacheDispatcher.start();
135
136 // Create network dispatchers (and corresponding threads) up to the pool size.
137 for (int i = 0; i < mDispatchers.length; i++) {
138 NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork,
139 mCache, mDelivery);
140 mDispatchers[i] = networkDispatcher;
141 networkDispatcher.start();
142 }
143 }
144
145 /**
146 * Stops the cache and network dispatchers.
147 */
148 public void stop() {
149 if (mCacheDispatcher != null) {
150 mCacheDispatcher.quit();
151 }
152 for (int i = 0; i < mDispatchers.length; i++) {
153 if (mDispatchers[i] != null) {
154 mDispatchers[i].quit();
155 }
156 }
157 }
158
159 /**
160 * Gets a sequence number.
161 */
Jean-Baptiste Queru6772bce2012-11-07 07:51:33 -0800162 public int getSequenceNumber() {
163 return mSequenceGenerator.incrementAndGet();
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -0800164 }
165
166 /**
Evan Charlton0acc7932012-11-06 13:58:19 -0800167 * Gets the {@link Cache} instance being used.
168 */
169 public Cache getCache() {
170 return mCache;
171 }
172
173 /**
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -0800174 * A simple predicate or filter interface for Requests, for use by
175 * {@link RequestQueue#cancelAll(RequestFilter)}.
176 */
177 public interface RequestFilter {
178 public boolean apply(Request<?> request);
179 }
180
181 /**
182 * Cancels all requests in this queue for which the given filter applies.
183 * @param filter The filtering function to use
184 */
185 public void cancelAll(RequestFilter filter) {
186 synchronized (mCurrentRequests) {
187 for (Request<?> request : mCurrentRequests) {
188 if (filter.apply(request)) {
189 request.cancel();
190 }
191 }
192 }
193 }
194
195 /**
196 * Cancels all requests in this queue with the given tag. Tag must be non-null
197 * and equality is by identity.
198 */
199 public void cancelAll(final Object tag) {
200 if (tag == null) {
201 throw new IllegalArgumentException("Cannot cancelAll with a null tag");
202 }
203 cancelAll(new RequestFilter() {
204 @Override
205 public boolean apply(Request<?> request) {
206 return request.getTag() == tag;
207 }
208 });
209 }
210
211 /**
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -0800212 * Adds a Request to the dispatch queue.
213 * @param request The request to service
214 * @return The passed-in request
215 */
Max Cai393504e2014-02-10 17:08:32 +0000216 public <T> Request<T> add(Request<T> request) {
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -0800217 // Tag the request as belonging to this queue and add it to the set of current requests.
218 request.setRequestQueue(this);
219 synchronized (mCurrentRequests) {
220 mCurrentRequests.add(request);
221 }
222
223 // Process requests in the order they are added.
Jean-Baptiste Queru6772bce2012-11-07 07:51:33 -0800224 request.setSequence(getSequenceNumber());
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -0800225 request.addMarker("add-to-queue");
226
227 // If the request is uncacheable, skip the cache queue and go straight to the network.
228 if (!request.shouldCache()) {
229 mNetworkQueue.add(request);
230 return request;
231 }
232
233 // Insert request into stage if there's already a request with the same cache key in flight.
234 synchronized (mWaitingRequests) {
235 String cacheKey = request.getCacheKey();
236 if (mWaitingRequests.containsKey(cacheKey)) {
237 // There is already a request in flight. Queue up.
Ficus Kirkpatrick35d5cc32014-02-08 11:15:04 -0800238 Queue<Request<?>> stagedRequests = mWaitingRequests.get(cacheKey);
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -0800239 if (stagedRequests == null) {
Ficus Kirkpatrick35d5cc32014-02-08 11:15:04 -0800240 stagedRequests = new LinkedList<Request<?>>();
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -0800241 }
242 stagedRequests.add(request);
243 mWaitingRequests.put(cacheKey, stagedRequests);
244 if (VolleyLog.DEBUG) {
245 VolleyLog.v("Request for cacheKey=%s is in flight, putting on hold.", cacheKey);
246 }
247 } else {
248 // Insert 'null' queue for this cacheKey, indicating there is now a request in
249 // flight.
250 mWaitingRequests.put(cacheKey, null);
251 mCacheQueue.add(request);
252 }
253 return request;
254 }
255 }
256
257 /**
258 * Called from {@link Request#finish(String)}, indicating that processing of the given request
259 * has finished.
260 *
261 * <p>Releases waiting requests for <code>request.getCacheKey()</code> if
262 * <code>request.shouldCache()</code>.</p>
263 */
Ficus Kirkpatrick35d5cc32014-02-08 11:15:04 -0800264 void finish(Request<?> request) {
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -0800265 // Remove from the set of requests currently being processed.
266 synchronized (mCurrentRequests) {
267 mCurrentRequests.remove(request);
268 }
269
270 if (request.shouldCache()) {
271 synchronized (mWaitingRequests) {
272 String cacheKey = request.getCacheKey();
Ficus Kirkpatrick35d5cc32014-02-08 11:15:04 -0800273 Queue<Request<?>> waitingRequests = mWaitingRequests.remove(cacheKey);
Jean-Baptiste Querud56b88a2012-11-07 07:48:57 -0800274 if (waitingRequests != null) {
275 if (VolleyLog.DEBUG) {
276 VolleyLog.v("Releasing %d waiting requests for cacheKey=%s.",
277 waitingRequests.size(), cacheKey);
278 }
279 // Process all queued up requests. They won't be considered as in flight, but
280 // that's not a problem as the cache has been primed by 'request'.
281 mCacheQueue.addAll(waitingRequests);
282 }
283 }
284 }
285 }
286}