blob: 162a8baa970b1823cdc87aca680add6d29c9a0c1 [file] [log] [blame]
Shubham Ajmerae318a0e2016-04-08 15:32:00 +01001/*
Shubham Ajmera519adb22016-06-29 15:13:16 +01002 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
Shubham Ajmerae318a0e2016-04-08 15:32:00 +01003 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package sun.nio.ch;
27
28import java.nio.channels.spi.AsynchronousChannelProvider;
29import java.io.IOException;
30import java.util.concurrent.ArrayBlockingQueue;
31import java.util.concurrent.RejectedExecutionException;
32import java.util.concurrent.atomic.AtomicInteger;
33import static sun.nio.ch.EPoll.*;
34
35/**
36 * AsynchronousChannelGroup implementation based on the Linux epoll facility.
37 */
38
39final class EPollPort
40 extends Port
41{
42 // maximum number of events to poll at a time
43 private static final int MAX_EPOLL_EVENTS = 512;
44
45 // errors
46 private static final int ENOENT = 2;
47
48 // epoll file descriptor
49 private final int epfd;
50
51 // true if epoll closed
52 private boolean closed;
53
54 // socket pair used for wakeup
55 private final int sp[];
56
57 // number of wakeups pending
58 private final AtomicInteger wakeupCount = new AtomicInteger();
59
60 // address of the poll array passed to epoll_wait
61 private final long address;
62
63 // encapsulates an event for a channel
64 static class Event {
65 final PollableChannel channel;
66 final int events;
67
68 Event(PollableChannel channel, int events) {
69 this.channel = channel;
70 this.events = events;
71 }
72
73 PollableChannel channel() { return channel; }
74 int events() { return events; }
75 }
76
77 // queue of events for cases that a polling thread dequeues more than one
78 // event
79 private final ArrayBlockingQueue<Event> queue;
80 private final Event NEED_TO_POLL = new Event(null, 0);
81 private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
82
83 EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
84 throws IOException
85 {
86 super(provider, pool);
87
88 // open epoll
89 this.epfd = epollCreate();
90
91 // create socket pair for wakeup mechanism
92 int[] sv = new int[2];
93 try {
94 socketpair(sv);
95 // register one end with epoll
Shubham Ajmera519adb22016-06-29 15:13:16 +010096 epollCtl(epfd, EPOLL_CTL_ADD, sv[0], Net.POLLIN);
Shubham Ajmerae318a0e2016-04-08 15:32:00 +010097 } catch (IOException x) {
98 close0(epfd);
99 throw x;
100 }
101 this.sp = sv;
102
103 // allocate the poll array
104 this.address = allocatePollArray(MAX_EPOLL_EVENTS);
105
106 // create the queue and offer the special event to ensure that the first
107 // threads polls
108 this.queue = new ArrayBlockingQueue<Event>(MAX_EPOLL_EVENTS);
109 this.queue.offer(NEED_TO_POLL);
110 }
111
112 EPollPort start() {
113 startThreads(new EventHandlerTask());
114 return this;
115 }
116
117 /**
118 * Release all resources
119 */
120 private void implClose() {
121 synchronized (this) {
122 if (closed)
123 return;
124 closed = true;
125 }
126 freePollArray(address);
127 close0(sp[0]);
128 close0(sp[1]);
129 close0(epfd);
130 }
131
132 private void wakeup() {
133 if (wakeupCount.incrementAndGet() == 1) {
134 // write byte to socketpair to force wakeup
135 try {
136 interrupt(sp[1]);
137 } catch (IOException x) {
138 throw new AssertionError(x);
139 }
140 }
141 }
142
143 @Override
144 void executeOnHandlerTask(Runnable task) {
145 synchronized (this) {
146 if (closed)
147 throw new RejectedExecutionException();
148 offerTask(task);
149 wakeup();
150 }
151 }
152
153 @Override
154 void shutdownHandlerTasks() {
155 /*
156 * If no tasks are running then just release resources; otherwise
157 * write to the one end of the socketpair to wakeup any polling threads.
158 */
159 int nThreads = threadCount();
160 if (nThreads == 0) {
161 implClose();
162 } else {
163 // send interrupt to each thread
164 while (nThreads-- > 0) {
165 wakeup();
166 }
167 }
168 }
169
170 // invoke by clients to register a file descriptor
171 @Override
172 void startPoll(int fd, int events) {
173 // update events (or add to epoll on first usage)
174 int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
175 if (err == ENOENT)
176 err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
177 if (err != 0)
178 throw new AssertionError(); // should not happen
179 }
180
181 /*
182 * Task to process events from epoll and dispatch to the channel's
183 * onEvent handler.
184 *
185 * Events are retreived from epoll in batch and offered to a BlockingQueue
186 * where they are consumed by handler threads. A special "NEED_TO_POLL"
187 * event is used to signal one consumer to re-poll when all events have
188 * been consumed.
189 */
190 private class EventHandlerTask implements Runnable {
191 private Event poll() throws IOException {
192 try {
193 for (;;) {
194 int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);
195 /*
196 * 'n' events have been read. Here we map them to their
197 * corresponding channel in batch and queue n-1 so that
198 * they can be handled by other handler threads. The last
199 * event is handled by this thread (and so is not queued).
200 */
201 fdToChannelLock.readLock().lock();
202 try {
203 while (n-- > 0) {
204 long eventAddress = getEvent(address, n);
205 int fd = getDescriptor(eventAddress);
206
207 // wakeup
208 if (fd == sp[0]) {
209 if (wakeupCount.decrementAndGet() == 0) {
210 // no more wakeups so drain pipe
211 drain1(sp[0]);
212 }
213
214 // queue special event if there are more events
215 // to handle.
216 if (n > 0) {
217 queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
218 continue;
219 }
220 return EXECUTE_TASK_OR_SHUTDOWN;
221 }
222
223 PollableChannel channel = fdToChannel.get(fd);
224 if (channel != null) {
225 int events = getEvents(eventAddress);
226 Event ev = new Event(channel, events);
227
228 // n-1 events are queued; This thread handles
229 // the last one except for the wakeup
230 if (n > 0) {
231 queue.offer(ev);
232 } else {
233 return ev;
234 }
235 }
236 }
237 } finally {
238 fdToChannelLock.readLock().unlock();
239 }
240 }
241 } finally {
242 // to ensure that some thread will poll when all events have
243 // been consumed
244 queue.offer(NEED_TO_POLL);
245 }
246 }
247
248 public void run() {
249 Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
250 Invoker.getGroupAndInvokeCount();
251 final boolean isPooledThread = (myGroupAndInvokeCount != null);
252 boolean replaceMe = false;
253 Event ev;
254 try {
255 for (;;) {
256 // reset invoke count
257 if (isPooledThread)
258 myGroupAndInvokeCount.resetInvokeCount();
259
260 try {
261 replaceMe = false;
262 ev = queue.take();
263
264 // no events and this thread has been "selected" to
265 // poll for more.
266 if (ev == NEED_TO_POLL) {
267 try {
268 ev = poll();
269 } catch (IOException x) {
270 x.printStackTrace();
271 return;
272 }
273 }
274 } catch (InterruptedException x) {
275 continue;
276 }
277
278 // handle wakeup to execute task or shutdown
279 if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
280 Runnable task = pollTask();
281 if (task == null) {
282 // shutdown request
283 return;
284 }
285 // run task (may throw error/exception)
286 replaceMe = true;
287 task.run();
288 continue;
289 }
290
291 // process event
292 try {
293 ev.channel().onEvent(ev.events(), isPooledThread);
294 } catch (Error x) {
295 replaceMe = true; throw x;
296 } catch (RuntimeException x) {
297 replaceMe = true; throw x;
298 }
299 }
300 } finally {
301 // last handler to exit when shutdown releases resources
302 int remaining = threadExit(this, replaceMe);
303 if (remaining == 0 && isShutdown()) {
304 implClose();
305 }
306 }
307 }
308 }
309
310 // -- Native methods --
311
312 private static native void socketpair(int[] sv) throws IOException;
313
314 private static native void interrupt(int fd) throws IOException;
315
316 private static native void drain1(int fd) throws IOException;
317
318 private static native void close0(int fd);
Neil Fullerdd2c67a2017-12-07 14:40:34 +0000319
320 // Android-removed: Code to load native libraries, doesn't make sense on Android.
321 /*
322 static {
323 IOUtil.load();
324 }
325 */
Shubham Ajmerae318a0e2016-04-08 15:32:00 +0100326}