/*
* Copyright ( c ) 2018 , 2022 , Oracle and / or its affiliates . All rights reserved .
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER .
*
* This code is free software ; you can redistribute it and / or modify it
* under the terms of the GNU General Public License version 2 only , as
* published by the Free Software Foundation .
*
* This code is distributed in the hope that it will be useful , but WITHOUT
* ANY WARRANTY ; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE . See the GNU General Public License
* version 2 for more details ( a copy is included in the LICENSE file that
* accompanied this code ) .
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work ; if not , write to the Free Software Foundation ,
* Inc . , 51 Franklin St , Fifth Floor , Boston , MA 02110 - 1301 USA .
*
* Please contact Oracle , 500 Oracle Parkway , Redwood Shores , CA 94065 USA
* or visit www . oracle . com if you need additional information or have any
* questions .
*/
/**
* @ test id = default
* @ bug 8284161
* @ summary Test virtual threads doing blocking I / O on java . net sockets
* @ enablePreview
* @ library / test / lib
* @ run testng / othervm BlockingSocketOps
*/
/**
* @ test id = direct - register
* @ summary Test virtual threads doing blocking I / O on java . net sockets and with
* the I / O poller configured to use direct registration
* @ enablePreview
* @ library / test / lib
* @ run testng / othervm - Djdk . useDirectRegister BlockingSocketOps
*/
/**
* @ test id = no - vmcontinuations
* @ requires vm . continuations
* @ enablePreview
* @ library / test / lib
* @ run testng / othervm - XX : + UnlockExperimentalVMOptions - XX : - VMContinuations BlockingSocketOps
*/
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import jdk.test.lib.thread .VThreadRunner;
import org.testng.annotations.Test;
import static org.testng.Assert .*;
public class BlockingSocketOps {
/**
* Socket read / write , no blocking .
*/
@Test
public void testSocketReadWrite1() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// write should not block
byte [] ba = "XXX" .getBytes("UTF-8" );
s1.getOutputStream().write(ba);
// read should not block
ba = new byte [10 ];
int n = s2.getInputStream().read(ba);
assertTrue(n > 0 );
assertTrue(ba[0 ] == 'X' );
}
});
}
/**
* Virtual thread blocks in read .
*/
@Test
public void testSocketRead1() throws Exception {
testSocketRead(0 );
}
/**
* Virtual thread blocks in timed read .
*/
@Test
public void testSocketRead2() throws Exception {
testSocketRead(60 _000 );
}
void testSocketRead(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// delayed write from sc1
byte [] ba1 = "XXX" .getBytes("UTF-8" );
runAfterParkedAsync(() -> s1.getOutputStream().write(ba1));
// read from sc2 should block
if (timeout > 0 ) {
s2.setSoTimeout(timeout);
}
byte [] ba2 = new byte [10 ];
int n = s2.getInputStream().read(ba2);
assertTrue(n > 0 );
assertTrue(ba2[0 ] == 'X' );
}
});
}
/**
* Virtual thread blocks in write .
*/
@Test
public void testSocketWrite1() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// delayed read from s2 to EOF
InputStream in = s2.getInputStream();
Thread reader = runAfterParkedAsync(() ->
in.transferTo(OutputStream.nullOutputStream()));
// write should block
byte [] ba = new byte [100 *1024 ];
try (OutputStream out = s1.getOutputStream()) {
for (int i = 0 ; i < 1000 ; i++) {
out.write(ba);
}
}
// wait for reader to finish
reader.join();
}
});
}
/**
* Virtual thread blocks in read , peer closes connection gracefully .
*/
@Test
public void testSocketReadPeerClose1() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// delayed close of s2
runAfterParkedAsync(s2::close);
// read from s1 should block, then read -1
int n = s1.getInputStream().read();
assertTrue(n == -1 );
}
});
}
/**
* Virtual thread blocks in read , peer closes connection abruptly .
*/
@Test
public void testSocketReadPeerClose2() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// delayed abrupt close of s2
s2.setSoLinger(true , 0 );
runAfterParkedAsync(s2::close);
// read from s1 should block, then throw
try {
int n = s1.getInputStream().read();
fail("read " + n);
} catch (IOException ioe) {
// expected
}
}
});
}
/**
* Socket close while virtual thread blocked in read .
*/
@Test
public void testSocketReadAsyncClose1() throws Exception {
testSocketReadAsyncClose(0 );
}
/**
* Socket close while virtual thread blocked in timed read .
*/
@Test
public void testSocketReadAsyncClose2() throws Exception {
testSocketReadAsyncClose(0 );
}
void testSocketReadAsyncClose(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s = connection.socket1();
// delayed close of s
runAfterParkedAsync(s::close);
// read from s should block, then throw
if (timeout > 0 ) {
s.setSoTimeout(timeout);
}
try {
int n = s.getInputStream().read();
fail("read " + n);
} catch (SocketException expected) { }
}
});
}
/**
* Virtual thread interrupted while blocked in Socket read .
*/
@Test
public void testSocketReadInterrupt1() throws Exception {
testSocketReadInterrupt(0 );
}
/**
* Virtual thread interrupted while blocked in Socket read with timeout
*/
@Test
public void testSocketReadInterrupt2() throws Exception {
testSocketReadInterrupt(60 _000 );
}
void testSocketReadInterrupt(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s = connection.socket1();
// delayed interrupt of current thread
Thread thisThread = Thread .currentThread();
runAfterParkedAsync(thisThread::interrupt);
// read from s should block, then throw
if (timeout > 0 ) {
s.setSoTimeout(timeout);
}
try {
int n = s.getInputStream().read();
fail("read " + n);
} catch (SocketException expected) {
assertTrue(Thread .interrupted());
assertTrue(s.isClosed());
}
}
});
}
/**
* Socket close while virtual thread blocked in write .
*/
@Test
public void testSocketWriteAsyncClose() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s = connection.socket1();
// delayedclose of s
runAfterParkedAsync(s::close);
// write to s should block, then throw
try {
byte [] ba = new byte [100 *1024 ];
OutputStream out = s.getOutputStream();
for (;;) {
out.write(ba);
}
} catch (SocketException expected) { }
}
});
}
/**
* Virtual thread interrupted while blocked in Socket write .
*/
@Test
public void testSocketWriteInterrupt() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s = connection.socket1();
// delayed interrupt of current thread
Thread thisThread = Thread .currentThread();
runAfterParkedAsync(thisThread::interrupt);
// write to s should block, then throw
try {
byte [] ba = new byte [100 *1024 ];
OutputStream out = s.getOutputStream();
for (;;) {
out.write(ba);
}
} catch (SocketException expected) {
assertTrue(Thread .interrupted());
assertTrue(s.isClosed());
}
}
});
}
/**
* Virtual thread reading urgent data when SO_OOBINLINE is enabled .
*/
@Test
public void testSocketReadUrgentData() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s1 = connection.socket1();
Socket s2 = connection.socket2();
// urgent data should be received
runAfterParkedAsync(() -> s2.sendUrgentData('X' ));
// read should block, then read the OOB byte
s1.setOOBInline(true );
byte [] ba = new byte [10 ];
int n = s1.getInputStream().read(ba);
assertTrue(n == 1 );
assertTrue(ba[0 ] == 'X' );
// urgent data should not be received
s1.setOOBInline(false );
s1.setSoTimeout(500 );
s2.sendUrgentData('X' );
try {
s1.getInputStream().read(ba);
fail();
} catch (SocketTimeoutException expected) { }
}
});
}
/**
* ServerSocket accept , no blocking .
*/
@Test
public void testServerSocketAccept1() throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket()) {
InetAddress loopback = InetAddress.getLoopbackAddress();
listener.bind(new InetSocketAddress(loopback, 0 ));
// establish connection
var socket1 = new Socket(loopback, listener.getLocalPort());
// accept should not block
var socket2 = listener.accept();
socket1.close();
socket2.close();
}
});
}
/**
* Virtual thread blocks in accept .
*/
@Test
public void testServerSocketAccept2() throws Exception {
testServerSocketAccept(0 );
}
/**
* Virtual thread blocks in timed accept .
*/
@Test
public void testServerSocketAccept3() throws Exception {
testServerSocketAccept(60 _000 );
}
void testServerSocketAccept(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket()) {
InetAddress loopback = InetAddress.getLoopbackAddress();
listener.bind(new InetSocketAddress(loopback, 0 ));
// schedule connect
var socket1 = new Socket();
SocketAddress remote = listener.getLocalSocketAddress();
runAfterParkedAsync(() -> socket1.connect(remote));
// accept should block
if (timeout > 0 ) {
listener.setSoTimeout(timeout);
}
var socket2 = listener.accept();
socket1.close();
socket2.close();
}
});
}
/**
* ServerSocket close while virtual thread blocked in accept .
*/
@Test
public void testServerSocketAcceptAsyncClose1() throws Exception {
testServerSocketAcceptAsyncClose(0 );
}
/**
* ServerSocket close while virtual thread blocked in timed accept .
*/
@Test
public void testServerSocketAcceptAsyncClose2() throws Exception {
testServerSocketAcceptAsyncClose(60 _000 );
}
void testServerSocketAcceptAsyncClose(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket()) {
InetAddress loopback = InetAddress.getLoopbackAddress();
listener.bind(new InetSocketAddress(loopback, 0 ));
// delayed close of listener
runAfterParkedAsync(listener::close);
// accept should block, then throw
if (timeout > 0 ) {
listener.setSoTimeout(timeout);
}
try {
listener.accept().close();
fail("connection accepted???" );
} catch (SocketException expected) { }
}
});
}
/**
* Virtual thread interrupted while blocked in ServerSocket accept .
*/
@Test
public void testServerSocketAcceptInterrupt1() throws Exception {
testServerSocketAcceptInterrupt(0 );
}
/**
* Virtual thread interrupted while blocked in ServerSocket accept with timeout .
*/
@Test
public void testServerSocketAcceptInterrupt2() throws Exception {
testServerSocketAcceptInterrupt(60 _000 );
}
void testServerSocketAcceptInterrupt(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket()) {
InetAddress loopback = InetAddress.getLoopbackAddress();
listener.bind(new InetSocketAddress(loopback, 0 ));
// delayed interrupt of current thread
Thread thisThread = Thread .currentThread();
runAfterParkedAsync(thisThread::interrupt);
// accept should block, then throw
if (timeout > 0 ) {
listener.setSoTimeout(timeout);
}
try {
listener.accept().close();
fail("connection accepted???" );
} catch (SocketException expected) {
assertTrue(Thread .interrupted());
assertTrue(listener.isClosed());
}
}
});
}
/**
* DatagramSocket receive / send , no blocking .
*/
@Test
public void testDatagramSocketSendReceive1() throws Exception {
VThreadRunner.run(() -> {
try (DatagramSocket s1 = new DatagramSocket(null );
DatagramSocket s2 = new DatagramSocket(null )) {
InetAddress lh = InetAddress.getLoopbackAddress();
s1.bind(new InetSocketAddress(lh, 0 ));
s2.bind(new InetSocketAddress(lh, 0 ));
// send should not block
byte [] bytes = "XXX" .getBytes("UTF-8" );
DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
p1.setSocketAddress(s2.getLocalSocketAddress());
s1.send(p1);
// receive should not block
byte [] ba = new byte [100 ];
DatagramPacket p2 = new DatagramPacket(ba, ba.length);
s2.receive(p2);
assertEquals(p2.getSocketAddress(), s1.getLocalSocketAddress());
assertTrue(ba[0 ] == 'X' );
}
});
}
/**
* Virtual thread blocks in DatagramSocket receive .
*/
@Test
public void testDatagramSocketSendReceive2() throws Exception {
testDatagramSocketSendReceive(0 );
}
/**
* Virtual thread blocks in DatagramSocket receive with timeout .
*/
@Test
public void testDatagramSocketSendReceive3() throws Exception {
testDatagramSocketSendReceive(60 _000 );
}
private void testDatagramSocketSendReceive(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (DatagramSocket s1 = new DatagramSocket(null );
DatagramSocket s2 = new DatagramSocket(null )) {
InetAddress lh = InetAddress.getLoopbackAddress();
s1.bind(new InetSocketAddress(lh, 0 ));
s2.bind(new InetSocketAddress(lh, 0 ));
// delayed send
byte [] bytes = "XXX" .getBytes("UTF-8" );
DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
p1.setSocketAddress(s2.getLocalSocketAddress());
runAfterParkedAsync(() -> s1.send(p1));
// receive should block
if (timeout > 0 ) {
s2.setSoTimeout(timeout);
}
byte [] ba = new byte [100 ];
DatagramPacket p2 = new DatagramPacket(ba, ba.length);
s2.receive(p2);
assertEquals(p2.getSocketAddress(), s1.getLocalSocketAddress());
assertTrue(ba[0 ] == 'X' );
}
});
}
/**
* Virtual thread blocks in DatagramSocket receive that times out .
*/
@Test
public void testDatagramSocketReceiveTimeout() throws Exception {
VThreadRunner.run(() -> {
try (DatagramSocket s = new DatagramSocket(null )) {
InetAddress lh = InetAddress.getLoopbackAddress();
s.bind(new InetSocketAddress(lh, 0 ));
s.setSoTimeout(500 );
byte [] ba = new byte [100 ];
DatagramPacket p = new DatagramPacket(ba, ba.length);
try {
s.receive(p);
fail();
} catch (SocketTimeoutException expected) { }
}
});
}
/**
* DatagramSocket close while virtual thread blocked in receive .
*/
@Test
public void testDatagramSocketReceiveAsyncClose1() throws Exception {
testDatagramSocketReceiveAsyncClose(0 );
}
/**
* DatagramSocket close while virtual thread blocked with timeout .
*/
@Test
public void testDatagramSocketReceiveAsyncClose2() throws Exception {
testDatagramSocketReceiveAsyncClose(60 _000 );
}
private void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (DatagramSocket s = new DatagramSocket(null )) {
InetAddress lh = InetAddress.getLoopbackAddress();
s.bind(new InetSocketAddress(lh, 0 ));
// delayed close of s
runAfterParkedAsync(s::close);
// receive should block, then throw
if (timeout > 0 ) {
s.setSoTimeout(timeout);
}
try {
byte [] ba = new byte [100 ];
DatagramPacket p = new DatagramPacket(ba, ba.length);
s.receive(p);
fail();
} catch (SocketException expected) { }
}
});
}
/**
* Virtual thread interrupted while blocked in DatagramSocket receive .
*/
@Test
public void testDatagramSocketReceiveInterrupt1() throws Exception {
testDatagramSocketReceiveInterrupt(0 );
}
/**
* Virtual thread interrupted while blocked in DatagramSocket receive with timeout .
*/
@Test
public void testDatagramSocketReceiveInterrupt2() throws Exception {
testDatagramSocketReceiveInterrupt(60 _000 );
}
private void testDatagramSocketReceiveInterrupt(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (DatagramSocket s = new DatagramSocket(null )) {
InetAddress lh = InetAddress.getLoopbackAddress();
s.bind(new InetSocketAddress(lh, 0 ));
// delayed interrupt of current thread
Thread thisThread = Thread .currentThread();
runAfterParkedAsync(thisThread::interrupt);
// receive should block, then throw
if (timeout > 0 ) {
s.setSoTimeout(timeout);
}
try {
byte [] ba = new byte [100 ];
DatagramPacket p = new DatagramPacket(ba, ba.length);
s.receive(p);
fail();
} catch (SocketException expected) {
assertTrue(Thread .interrupted());
assertTrue(s.isClosed());
}
}
});
}
/**
* Creates a loopback connection
*/
static class Connection implements Closeable {
private final Socket s1;
private final Socket s2;
Connection() throws IOException {
var lh = InetAddress.getLoopbackAddress();
try (var listener = new ServerSocket()) {
listener.bind(new InetSocketAddress(lh, 0 ));
Socket s1 = new Socket();
Socket s2;
try {
s1.connect(listener.getLocalSocketAddress(), 10 _000 );
s2 = listener.accept();
} catch (IOException ioe) {
s1.close();
throw ioe;
}
this .s1 = s1;
this .s2 = s2;
}
}
Socket socket1() {
return s1;
}
Socket socket2() {
return s2;
}
@Override
public void close() throws IOException {
s1.close();
s2.close();
}
}
@FunctionalInterface
interface ThrowingRunnable {
void run() throws Exception;
}
/**
* Runs the given task asynchronously after the current virtual thread has parked .
* @ return the thread started to run the task
*/
static Thread runAfterParkedAsync(ThrowingRunnable task) {
Thread target = Thread .currentThread();
if (!target.isVirtual())
throw new WrongThreadException();
return Thread .ofPlatform().daemon().start(() -> {
try {
Thread .State state = target.getState();
while (state != Thread .State.WAITING
&& state != Thread .State.TIMED_WAITING) {
Thread .sleep(20 );
state = target.getState();
}
Thread .sleep(20 ); // give a bit more time to release carrier
task.run();
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
Messung V0.5 in Prozent C=91 H=63 G=77
¤ Dauer der Verarbeitung: 0.16 Sekunden
¤
*© Formatika GbR, Deutschland