/* * Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions.
*/ package sun.nio.ch;
/** * Polls file descriptors. Virtual threads invoke the poll method to park * until a given file descriptor is ready for I/O.
*/ publicabstractclass Poller { privatestaticfinal JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); privatestaticfinal Poller[] READ_POLLERS; privatestaticfinal Poller[] WRITE_POLLERS; privatestaticfinalint READ_MASK, WRITE_MASK; privatestaticfinalboolean USE_DIRECT_REGISTER;
// true if this is a poller for reading, false for writing privatefinalboolean read;
// maps file descriptors to parked Thread privatefinal Map<Integer, Thread> map = new ConcurrentHashMap<>();
// the queue of updates to the updater Thread privatefinal BlockingQueue<Request> queue = new LinkedTransferQueue<>();
/** * Initialize a Poller for reading or writing.
*/ protected Poller(boolean read) { this.read = read;
}
/** * Returns true if this poller is for read (POLLIN) events.
*/ finalboolean reading() { return read;
}
/** * Parks the current thread until a file descriptor is ready for the given op. * @param fdVal the file descriptor * @param event POLLIN or POLLOUT * @param nanos the waiting time or 0 to wait indefinitely * @param supplier supplies a boolean to indicate if the enclosing object is open
*/ publicstaticvoid poll(int fdVal, int event, long nanos, BooleanSupplier supplier) throws IOException
{ assert nanos >= 0L; if (event == Net.POLLIN) {
readPoller(fdVal).poll(fdVal, nanos, supplier);
} elseif (event == Net.POLLOUT) {
writePoller(fdVal).poll(fdVal, nanos, supplier);
} else { assertfalse;
}
}
/** * Parks the current thread until a file descriptor is ready.
*/ privatevoid poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException { if (USE_DIRECT_REGISTER) {
poll1(fdVal, nanos, supplier);
} else {
poll2(fdVal, nanos, supplier);
}
}
/** * Parks the current thread until a file descriptor is ready. This implementation * registers the file descriptor, then parks until the file descriptor is polled.
*/ privatevoid poll1(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
register(fdVal); try { boolean isOpen = supplier.getAsBoolean(); if (isOpen) { if (nanos > 0) {
LockSupport.parkNanos(nanos);
} else {
LockSupport.park();
}
}
} finally {
deregister(fdVal);
}
}
/** * Parks the current thread until a file descriptor is ready. This implementation * queues the file descriptor to the update thread, then parks until the file * descriptor is polled.
*/ privatevoid poll2(int fdVal, long nanos, BooleanSupplier supplier) {
Request request = registerAsync(fdVal); try { boolean isOpen = supplier.getAsBoolean(); if (isOpen) { if (nanos > 0) {
LockSupport.parkNanos(nanos);
} else {
LockSupport.park();
}
}
} finally {
request.awaitFinish();
deregister(fdVal);
}
}
/** * Queues the file descriptor to be registered by the updater thread, returning * a Request object to track the request.
*/ private Request registerAsync(int fdVal) { Thread previous = map.putIfAbsent(fdVal, Thread.currentThread()); assert previous == null;
Request request = new Request(fdVal);
queue.add(request); return request;
}
/** * Deregister the file descriptor, a no-op if already polled.
*/ privatevoid deregister(int fdVal) { Thread previous = map.remove(fdVal); assert previous == null || previous == Thread.currentThread(); if (previous != null) {
implDeregister(fdVal);
}
}
/** * A registration request queued to the updater thread.
*/ privatestaticclass Request { privatefinalint fdVal; privatevolatileboolean done; privatevolatileThread waiter;
Request(int fdVal) { this.fdVal = fdVal;
}
privateint fdVal() { return fdVal;
}
/** * Invoked by the updater when the request has been processed.
*/ void finish() {
done = true; Thread waiter = this.waiter; if (waiter != null) {
LockSupport.unpark(waiter);
}
}
/** * Waits for a request to be processed.
*/ void awaitFinish() { if (!done) {
waiter = Thread.currentThread(); boolean interrupted = false; while (!done) {
LockSupport.park(); if (Thread.interrupted()) {
interrupted = true;
}
} if (interrupted) { Thread.currentThread().interrupt();
}
}
}
}
/** * The update loop to handle updates to the interest set.
*/ privatevoid updateLoop() { try { for (;;) {
Request req = null; while (req == null) { try {
req = queue.take();
} catch (InterruptedException ignore) { }
}
implRegister(req.fdVal());
req.finish();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/** * Maps the file descriptor value to a read poller.
*/ privatestatic Poller readPoller(int fdVal) { return READ_POLLERS[fdVal & READ_MASK];
}
/** * Maps the file descriptor value to a write poller.
*/ privatestatic Poller writePoller(int fdVal) { return WRITE_POLLERS[fdVal & WRITE_MASK];
}
/** * Unparks any thread that is polling the given file descriptor for the * given event.
*/ staticvoid stopPoll(int fdVal, int event) { if (event == Net.POLLIN) {
readPoller(fdVal).wakeup(fdVal);
} elseif (event == Net.POLLOUT) {
writePoller(fdVal).wakeup(fdVal);
} else { thrownew IllegalArgumentException();
}
}
/** * Unparks any threads that are polling the given file descriptor.
*/ staticvoid stopPoll(int fdVal) {
stopPoll(fdVal, Net.POLLIN);
stopPoll(fdVal, Net.POLLOUT);
}
/** * Unparks any thread that is polling the given file descriptor.
*/ privatevoid wakeup(int fdVal) { Thread t = map.remove(fdVal); if (t != null) {
LockSupport.unpark(t);
}
}
/** * Called by the polling facility when the file descriptor is polled
*/ finalvoid polled(int fdVal) {
wakeup(fdVal);
}
/** * Poll for events. The {@link #polled(int)} method is invoked for each * polled file descriptor. * * @param timeout if positive then block for up to {@code timeout} milliseconds, * if zero then don't block, if -1 then block indefinitely
*/ abstractint poll(int timeout) throws IOException;
/** * Create the read poller(s).
*/ privatestatic Poller[] createReadPollers(PollerProvider provider) throws IOException { int readPollerCount = pollerCount("jdk.readPollers");
Poller[] readPollers = new Poller[readPollerCount]; for (int i = 0; i< readPollerCount; i++) { var poller = provider.readPoller();
readPollers[i] = poller.start();
} return readPollers;
}
/** * Create the write poller(s).
*/ privatestatic Poller[] createWritePollers(PollerProvider provider) throws IOException { int writePollerCount = pollerCount("jdk.writePollers");
Poller[] writePollers = new Poller[writePollerCount]; for (int i = 0; i< writePollerCount; i++) { var poller = provider.writePoller();
writePollers[i] = poller.start();
} return writePollers;
}
/** * Reads the given property name to get the poller count. If the property is * set then the value must be a power of 2. Returns 1 if the property is not * set. * @throws IllegalArgumentException if the property is set to a value that * is not a power of 2.
*/ privatestaticint pollerCount(String propName) {
String s = GetPropertyAction.privilegedGetProperty(propName, "1"); int count = Integer.parseInt(s);
// check power of 2 if (count != (1 << log2(count))) {
String msg = propName + " is set to a vale that is not a power of 2"; thrownew IllegalArgumentException(msg);
} return count;
}
/** * Return a stream of all threads blocked waiting for I/O operations.
*/ publicstatic Stream<Thread> blockedThreads() {
Stream<Thread> s = Stream.empty(); for (int i = 0; i < READ_POLLERS.length; i++) {
s = Stream.concat(s, READ_POLLERS[i].registeredThreads());
} for (int i = 0; i < WRITE_POLLERS.length; i++) {
s = Stream.concat(s, WRITE_POLLERS[i].registeredThreads());
} return s;
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.