blob: dff3378ae990176e83e084fdf1ba6116370e3a31 [file] [log] [blame]
J. Duke319a3b92007-12-01 00:00:00 +00001/*
2 * Copyright 1999 Sun Microsystems, Inc. All Rights Reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *
8 * - Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 *
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * - Neither the name of Sun Microsystems nor the names of its
16 * contributors may be used to endorse or promote products derived
17 * from this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
20 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
32/*
33 File: SLQ.java
34 Originally: LinkedQueue.java
35
36 Originally written by Doug Lea and released into the public domain.
37 This may be used for any purposes whatsoever without acknowledgment.
38 Thanks for the assistance and support of Sun Microsystems Labs,
39 and everyone contributing, testing, and using this code.
40
41 History:
42 Date Who What
43 11Jun1998 dl Create public version
44 25aug1998 dl added peek
45 10dec1998 dl added isEmpty
46 10jun1999 bc modified for isolated use
47*/
48
49// Original was in package EDU.oswego.cs.dl.util.concurrent;
50
51/**
52 * A linked list based channel implementation,
53 * adapted from the TwoLockQueue class from CPJ.
54 * The algorithm avoids contention between puts
55 * and takes when the queue is not empty.
56 * Normally a put and a take can proceed simultaneously.
57 * (Although it does not allow multiple concurrent puts or takes.)
58 * This class tends to perform more efficently than
59 * other Channel implementations in producer/consumer
60 * applications.
61 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
62 **/
63
64public class LinkedQueue {
65
66
67 /**
68 * Dummy header node of list. The first actual node, if it exists, is always
69 * at head_.next. After each take, the old first node becomes the head.
70 **/
71 protected LinkedNode head_;
72 protected int count_;
73 /**
74 * Helper monitor for managing access to last node, in case it is also first.
75 * last_ and waitingForTake_ ONLY used with synch on appendMonitor_
76 **/
77 protected final Object lastMonitor_ = new Object();
78
79 /**
80 * The last node of list. Put() appends to list, so modifies last_
81 **/
82 protected LinkedNode last_;
83
84 /**
85 * The number of threads waiting for a take.
86 * Notifications are provided in put only if greater than zero.
87 * The bookkeeping is worth it here since in reasonably balanced
88 * usages, the notifications will hardly ever be necessary, so
89 * the call overhead to notify can be eliminated.
90 **/
91 protected int waitingForTake_ = 0;
92
93 public LinkedQueue() {
94 head_ = new LinkedNode(null);
95 last_ = head_;
96 count_ = 0;
97 }
98
99 /** Main mechanics for put/offer **/
100 protected void insert(Object x) {
101 synchronized(lastMonitor_) {
102 LinkedNode p = new LinkedNode(x);
103 last_.next = p;
104 last_ = p;
105 count_++;
106 if (count_ > 1000 && (count_ % 1000 == 0))
107 System.out.println("In Queue : " + count_);
108 if (waitingForTake_ > 0)
109 lastMonitor_.notify();
110 }
111 }
112
113 /** Main mechanics for take/poll **/
114 protected synchronized Object extract() {
115 Object x = null;
116 LinkedNode first = head_.next;
117 if (first != null) {
118 x = first.value;
119 first.value = null;
120 head_ = first;
121 count_ --;
122 }
123 return x;
124 }
125
126
127 public void put(Object x) throws InterruptedException {
128 if (x == null) throw new IllegalArgumentException();
129 if (Thread.interrupted()) throw new InterruptedException();
130 insert(x);
131 }
132
133 public boolean offer(Object x, long msecs) throws InterruptedException {
134 if (x == null) throw new IllegalArgumentException();
135 if (Thread.interrupted()) throw new InterruptedException();
136 insert(x);
137 return true;
138 }
139
140 public Object take() throws InterruptedException {
141 if (Thread.interrupted()) throw new InterruptedException();
142 // try to extract. If fail, then enter wait-based retry loop
143 Object x = extract();
144 if (x != null)
145 return x;
146 else {
147 synchronized(lastMonitor_) {
148 try {
149 ++waitingForTake_;
150 for (;;) {
151 x = extract();
152 if (x != null) {
153 --waitingForTake_;
154 return x;
155 }
156 else {
157 lastMonitor_.wait();
158 }
159 }
160 }
161 catch(InterruptedException ex) {
162 --waitingForTake_;
163 lastMonitor_.notify();
164 throw ex;
165 }
166 }
167 }
168 }
169
170 public synchronized Object peek() {
171 LinkedNode first = head_.next;
172 if (first != null)
173 return first.value;
174 else
175 return null;
176 }
177
178
179 public synchronized boolean isEmpty() {
180 return head_.next == null;
181 }
182
183 public Object poll(long msecs) throws InterruptedException {
184 if (Thread.interrupted()) throw new InterruptedException();
185 Object x = extract();
186 if (x != null)
187 return x;
188 else {
189 synchronized(lastMonitor_) {
190 try {
191 long waitTime = msecs;
192 long start = (msecs <= 0)? 0 : System.currentTimeMillis();
193 ++waitingForTake_;
194 for (;;) {
195 x = extract();
196 if (x != null || waitTime <= 0) {
197 --waitingForTake_;
198 return x;
199 }
200 else {
201 lastMonitor_.wait(waitTime);
202 waitTime = msecs - (System.currentTimeMillis() - start);
203 }
204 }
205 }
206 catch(InterruptedException ex) {
207 --waitingForTake_;
208 lastMonitor_.notify();
209 throw ex;
210 }
211 }
212 }
213 }
214
215 class LinkedNode {
216 Object value;
217 LinkedNode next = null;
218 LinkedNode(Object x) { value = x; }
219 LinkedNode(Object x, LinkedNode n) { value = x; next = n; }
220 }
221}