Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 1 | /* |
| 2 | * Copyright (C) 2011 The Android Open Source Project |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package com.android.volley; |
| 18 | |
| 19 | import android.os.Handler; |
| 20 | import android.os.Looper; |
| 21 | |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 22 | import java.util.HashMap; |
| 23 | import java.util.HashSet; |
| 24 | import java.util.LinkedList; |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 25 | import java.util.Map; |
| 26 | import java.util.Queue; |
| 27 | import java.util.Set; |
| 28 | import java.util.concurrent.PriorityBlockingQueue; |
| 29 | import java.util.concurrent.atomic.AtomicInteger; |
| 30 | |
| 31 | /** |
| 32 | * A request dispatch queue with a thread pool of dispatchers. |
| 33 | * |
| 34 | * Calling {@link #add(Request)} will enqueue the given Request for dispatch, |
| 35 | * resolving from either cache or network on a worker thread, and then delivering |
| 36 | * a parsed response on the main thread. |
| 37 | */ |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 38 | public class RequestQueue { |
| 39 | |
| 40 | /** Used for generating monotonically-increasing sequence numbers for requests. */ |
Jean-Baptiste Queru | 6772bce | 2012-11-07 07:51:33 -0800 | [diff] [blame] | 41 | private AtomicInteger mSequenceGenerator = new AtomicInteger(); |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 42 | |
| 43 | /** |
| 44 | * Staging area for requests that already have a duplicate request in flight. |
| 45 | * |
| 46 | * <ul> |
| 47 | * <li>containsKey(cacheKey) indicates that there is a request in flight for the given cache |
| 48 | * key.</li> |
| 49 | * <li>get(cacheKey) returns waiting requests for the given cache key. The in flight request |
| 50 | * is <em>not</em> contained in that list. Is null if no requests are staged.</li> |
| 51 | * </ul> |
| 52 | */ |
Ficus Kirkpatrick | 35d5cc3 | 2014-02-08 11:15:04 -0800 | [diff] [blame] | 53 | private final Map<String, Queue<Request<?>>> mWaitingRequests = |
| 54 | new HashMap<String, Queue<Request<?>>>(); |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 55 | |
| 56 | /** |
| 57 | * The set of all requests currently being processed by this RequestQueue. A Request |
| 58 | * will be in this set if it is waiting in any queue or currently being processed by |
| 59 | * any dispatcher. |
| 60 | */ |
Ficus Kirkpatrick | 35d5cc3 | 2014-02-08 11:15:04 -0800 | [diff] [blame] | 61 | private final Set<Request<?>> mCurrentRequests = new HashSet<Request<?>>(); |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 62 | |
| 63 | /** The cache triage queue. */ |
Ficus Kirkpatrick | 35d5cc3 | 2014-02-08 11:15:04 -0800 | [diff] [blame] | 64 | private final PriorityBlockingQueue<Request<?>> mCacheQueue = |
| 65 | new PriorityBlockingQueue<Request<?>>(); |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 66 | |
| 67 | /** The queue of requests that are actually going out to the network. */ |
Ficus Kirkpatrick | 35d5cc3 | 2014-02-08 11:15:04 -0800 | [diff] [blame] | 68 | private final PriorityBlockingQueue<Request<?>> mNetworkQueue = |
| 69 | new PriorityBlockingQueue<Request<?>>(); |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 70 | |
| 71 | /** Number of network request dispatcher threads to start. */ |
| 72 | private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4; |
| 73 | |
Ficus Kirkpatrick | 35d5cc3 | 2014-02-08 11:15:04 -0800 | [diff] [blame] | 74 | /** Cache interface for retrieving and storing responses. */ |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 75 | private final Cache mCache; |
| 76 | |
| 77 | /** Network interface for performing requests. */ |
| 78 | private final Network mNetwork; |
| 79 | |
| 80 | /** Response delivery mechanism. */ |
| 81 | private final ResponseDelivery mDelivery; |
| 82 | |
| 83 | /** The network dispatchers. */ |
| 84 | private NetworkDispatcher[] mDispatchers; |
| 85 | |
| 86 | /** The cache dispatcher. */ |
| 87 | private CacheDispatcher mCacheDispatcher; |
| 88 | |
| 89 | /** |
| 90 | * Creates the worker pool. Processing will not begin until {@link #start()} is called. |
| 91 | * |
| 92 | * @param cache A Cache to use for persisting responses to disk |
| 93 | * @param network A Network interface for performing HTTP requests |
| 94 | * @param threadPoolSize Number of network dispatcher threads to create |
| 95 | * @param delivery A ResponseDelivery interface for posting responses and errors |
| 96 | */ |
| 97 | public RequestQueue(Cache cache, Network network, int threadPoolSize, |
| 98 | ResponseDelivery delivery) { |
| 99 | mCache = cache; |
| 100 | mNetwork = network; |
| 101 | mDispatchers = new NetworkDispatcher[threadPoolSize]; |
| 102 | mDelivery = delivery; |
| 103 | } |
| 104 | |
| 105 | /** |
| 106 | * Creates the worker pool. Processing will not begin until {@link #start()} is called. |
| 107 | * |
| 108 | * @param cache A Cache to use for persisting responses to disk |
| 109 | * @param network A Network interface for performing HTTP requests |
| 110 | * @param threadPoolSize Number of network dispatcher threads to create |
| 111 | */ |
| 112 | public RequestQueue(Cache cache, Network network, int threadPoolSize) { |
| 113 | this(cache, network, threadPoolSize, |
| 114 | new ExecutorDelivery(new Handler(Looper.getMainLooper()))); |
| 115 | } |
| 116 | |
| 117 | /** |
| 118 | * Creates the worker pool. Processing will not begin until {@link #start()} is called. |
| 119 | * |
| 120 | * @param cache A Cache to use for persisting responses to disk |
| 121 | * @param network A Network interface for performing HTTP requests |
| 122 | */ |
| 123 | public RequestQueue(Cache cache, Network network) { |
| 124 | this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE); |
| 125 | } |
| 126 | |
| 127 | /** |
| 128 | * Starts the dispatchers in this queue. |
| 129 | */ |
| 130 | public void start() { |
| 131 | stop(); // Make sure any currently running dispatchers are stopped. |
| 132 | // Create the cache dispatcher and start it. |
| 133 | mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery); |
| 134 | mCacheDispatcher.start(); |
| 135 | |
| 136 | // Create network dispatchers (and corresponding threads) up to the pool size. |
| 137 | for (int i = 0; i < mDispatchers.length; i++) { |
| 138 | NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork, |
| 139 | mCache, mDelivery); |
| 140 | mDispatchers[i] = networkDispatcher; |
| 141 | networkDispatcher.start(); |
| 142 | } |
| 143 | } |
| 144 | |
| 145 | /** |
| 146 | * Stops the cache and network dispatchers. |
| 147 | */ |
| 148 | public void stop() { |
| 149 | if (mCacheDispatcher != null) { |
| 150 | mCacheDispatcher.quit(); |
| 151 | } |
| 152 | for (int i = 0; i < mDispatchers.length; i++) { |
| 153 | if (mDispatchers[i] != null) { |
| 154 | mDispatchers[i].quit(); |
| 155 | } |
| 156 | } |
| 157 | } |
| 158 | |
| 159 | /** |
| 160 | * Gets a sequence number. |
| 161 | */ |
Jean-Baptiste Queru | 6772bce | 2012-11-07 07:51:33 -0800 | [diff] [blame] | 162 | public int getSequenceNumber() { |
| 163 | return mSequenceGenerator.incrementAndGet(); |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 164 | } |
| 165 | |
| 166 | /** |
Evan Charlton | 0acc793 | 2012-11-06 13:58:19 -0800 | [diff] [blame] | 167 | * Gets the {@link Cache} instance being used. |
| 168 | */ |
| 169 | public Cache getCache() { |
| 170 | return mCache; |
| 171 | } |
| 172 | |
| 173 | /** |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 174 | * A simple predicate or filter interface for Requests, for use by |
| 175 | * {@link RequestQueue#cancelAll(RequestFilter)}. |
| 176 | */ |
| 177 | public interface RequestFilter { |
| 178 | public boolean apply(Request<?> request); |
| 179 | } |
| 180 | |
| 181 | /** |
| 182 | * Cancels all requests in this queue for which the given filter applies. |
| 183 | * @param filter The filtering function to use |
| 184 | */ |
| 185 | public void cancelAll(RequestFilter filter) { |
| 186 | synchronized (mCurrentRequests) { |
| 187 | for (Request<?> request : mCurrentRequests) { |
| 188 | if (filter.apply(request)) { |
| 189 | request.cancel(); |
| 190 | } |
| 191 | } |
| 192 | } |
| 193 | } |
| 194 | |
| 195 | /** |
| 196 | * Cancels all requests in this queue with the given tag. Tag must be non-null |
| 197 | * and equality is by identity. |
| 198 | */ |
| 199 | public void cancelAll(final Object tag) { |
| 200 | if (tag == null) { |
| 201 | throw new IllegalArgumentException("Cannot cancelAll with a null tag"); |
| 202 | } |
| 203 | cancelAll(new RequestFilter() { |
| 204 | @Override |
| 205 | public boolean apply(Request<?> request) { |
| 206 | return request.getTag() == tag; |
| 207 | } |
| 208 | }); |
| 209 | } |
| 210 | |
| 211 | /** |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 212 | * Adds a Request to the dispatch queue. |
| 213 | * @param request The request to service |
| 214 | * @return The passed-in request |
| 215 | */ |
Max Cai | 393504e | 2014-02-10 17:08:32 +0000 | [diff] [blame] | 216 | public <T> Request<T> add(Request<T> request) { |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 217 | // Tag the request as belonging to this queue and add it to the set of current requests. |
| 218 | request.setRequestQueue(this); |
| 219 | synchronized (mCurrentRequests) { |
| 220 | mCurrentRequests.add(request); |
| 221 | } |
| 222 | |
| 223 | // Process requests in the order they are added. |
Jean-Baptiste Queru | 6772bce | 2012-11-07 07:51:33 -0800 | [diff] [blame] | 224 | request.setSequence(getSequenceNumber()); |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 225 | request.addMarker("add-to-queue"); |
| 226 | |
| 227 | // If the request is uncacheable, skip the cache queue and go straight to the network. |
| 228 | if (!request.shouldCache()) { |
| 229 | mNetworkQueue.add(request); |
| 230 | return request; |
| 231 | } |
| 232 | |
| 233 | // Insert request into stage if there's already a request with the same cache key in flight. |
| 234 | synchronized (mWaitingRequests) { |
| 235 | String cacheKey = request.getCacheKey(); |
| 236 | if (mWaitingRequests.containsKey(cacheKey)) { |
| 237 | // There is already a request in flight. Queue up. |
Ficus Kirkpatrick | 35d5cc3 | 2014-02-08 11:15:04 -0800 | [diff] [blame] | 238 | Queue<Request<?>> stagedRequests = mWaitingRequests.get(cacheKey); |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 239 | if (stagedRequests == null) { |
Ficus Kirkpatrick | 35d5cc3 | 2014-02-08 11:15:04 -0800 | [diff] [blame] | 240 | stagedRequests = new LinkedList<Request<?>>(); |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 241 | } |
| 242 | stagedRequests.add(request); |
| 243 | mWaitingRequests.put(cacheKey, stagedRequests); |
| 244 | if (VolleyLog.DEBUG) { |
| 245 | VolleyLog.v("Request for cacheKey=%s is in flight, putting on hold.", cacheKey); |
| 246 | } |
| 247 | } else { |
| 248 | // Insert 'null' queue for this cacheKey, indicating there is now a request in |
| 249 | // flight. |
| 250 | mWaitingRequests.put(cacheKey, null); |
| 251 | mCacheQueue.add(request); |
| 252 | } |
| 253 | return request; |
| 254 | } |
| 255 | } |
| 256 | |
| 257 | /** |
| 258 | * Called from {@link Request#finish(String)}, indicating that processing of the given request |
| 259 | * has finished. |
| 260 | * |
| 261 | * <p>Releases waiting requests for <code>request.getCacheKey()</code> if |
| 262 | * <code>request.shouldCache()</code>.</p> |
| 263 | */ |
Ficus Kirkpatrick | 35d5cc3 | 2014-02-08 11:15:04 -0800 | [diff] [blame] | 264 | void finish(Request<?> request) { |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 265 | // Remove from the set of requests currently being processed. |
| 266 | synchronized (mCurrentRequests) { |
| 267 | mCurrentRequests.remove(request); |
| 268 | } |
| 269 | |
| 270 | if (request.shouldCache()) { |
| 271 | synchronized (mWaitingRequests) { |
| 272 | String cacheKey = request.getCacheKey(); |
Ficus Kirkpatrick | 35d5cc3 | 2014-02-08 11:15:04 -0800 | [diff] [blame] | 273 | Queue<Request<?>> waitingRequests = mWaitingRequests.remove(cacheKey); |
Jean-Baptiste Queru | d56b88a | 2012-11-07 07:48:57 -0800 | [diff] [blame] | 274 | if (waitingRequests != null) { |
| 275 | if (VolleyLog.DEBUG) { |
| 276 | VolleyLog.v("Releasing %d waiting requests for cacheKey=%s.", |
| 277 | waitingRequests.size(), cacheKey); |
| 278 | } |
| 279 | // Process all queued up requests. They won't be considered as in flight, but |
| 280 | // that's not a problem as the cache has been primed by 'request'. |
| 281 | mCacheQueue.addAll(waitingRequests); |
| 282 | } |
| 283 | } |
| 284 | } |
| 285 | } |
| 286 | } |