/* * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions.
*/
/* * @test * @bug 8201186 * @summary Tests an asynchronous BodySubscriber that completes * immediately with a Publisher<List<ByteBuffer>> * @library /test/lib http2/server * @build jdk.test.lib.net.SimpleSSLContext * @modules java.base/sun.net.www.http * java.net.http/jdk.internal.net.http.common * java.net.http/jdk.internal.net.http.frame * java.net.http/jdk.internal.net.http.hpack * @run testng/othervm ResponsePublisher
*/
staticfinalint ITERATION_COUNT = 3; // a shared executor helps reduce the amount of threads created by the test staticfinal Executor executor = Executors.newCachedThreadPool();
HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
.build();
BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler); try {
response.body().subscribe(null); thrownew RuntimeException("Expected NPE not thrown");
} catch (NullPointerException x) {
System.out.println("Got expected NPE: " + x);
} // We can reuse our BodySubscribers implementations to subscribe to the // Publisher<List<ByteBuffer>>
BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
response.body().subscribe(ofString);
BodySubscriber<String> ofString2 = BodySubscribers.ofString(UTF_8);
response.body().subscribe(ofString2); try {
ofString2.getBody().toCompletableFuture().join(); thrownew RuntimeException("Expected ISE not thrown");
} catch (CompletionException x) {
Throwable cause = x.getCause(); if (cause instanceof IllegalStateException) {
System.out.println("Got expected ISE: " + cause);
} else { throw x;
}
} // Get the final result and compare it with the expected body
String body = ofString.getBody().toCompletableFuture().get();
assertEquals(body, "");
}
}
HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
.build();
BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler); // We can reuse our BodySubscribers implementations to subscribe to the // Publisher<List<ByteBuffer>>
BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8); // get the Publisher<List<ByteBuffer>> and // subscribe to it.
response.body().subscribe(ofString); // Get the final result and compare it with the expected body
String body = ofString.getBody().toCompletableFuture().get();
assertEquals(body, "");
}
}
HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
.build();
BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get(); // We can reuse our BodySubscribers implementations to subscribe to the // Publisher<List<ByteBuffer>>
BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
CompletableFuture<String> result =
client.sendAsync(req, handler).thenCompose(
(responsePublisher) -> { // get the Publisher<List<ByteBuffer>> and // subscribe to it.
responsePublisher.body().subscribe(ofString); return ofString.getBody();
}); // Get the final result and compare it with the expected body
assertEquals(result.get(), "");
}
}
HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody"))
.build();
BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler); // We can reuse our BodySubscribers implementations to subscribe to the // Publisher<List<ByteBuffer>>
BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8); // get the Publisher<List<ByteBuffer>> and // subscribe to it.
response.body().subscribe(ofString); // Get the final result and compare it with the expected body
String body = ofString.getBody().toCompletableFuture().get();
assertEquals(body, WITH_BODY);
}
}
HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody"))
.build();
BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get(); // We can reuse our BodySubscribers implementations to subscribe to the // Publisher<List<ByteBuffer>>
BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
CompletableFuture<String> result = client.sendAsync(req, handler)
.thenCompose((responsePublisher) -> { // get the Publisher<List<ByteBuffer>> and // subscribe to it.
responsePublisher.body().subscribe(ofString); return ofString.getBody();
}); // Get the final result and compare it with the expected body
String body = result.get();
assertEquals(body, WITH_BODY);
}
}
// A BodyHandler that returns PublishingBodySubscriber instances staticclass PublishingBodyHandler implements BodyHandler<Publisher<List<ByteBuffer>>> {
@Override public BodySubscriber<Publisher<List<ByteBuffer>>> apply(HttpResponse.ResponseInfo rinfo) {
assertEquals(rinfo.statusCode(), 200); returnnew PublishingBodySubscriber();
}
}
// A BodySubscriber that returns a Publisher<List<ByteBuffer>> staticclass PublishingBodySubscriber implements BodySubscriber<Publisher<List<ByteBuffer>>> { privatefinal CompletableFuture<Flow.Subscription> subscriptionCF = new CompletableFuture<>(); privatefinal CompletableFuture<Flow.Subscriber<? super List<ByteBuffer>>> subscribedCF = newCompletableFuture<>(); private AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>(); privatefinal CompletionStage<Publisher<List<ByteBuffer>>> body =
subscriptionCF.thenCompose((s) -> CompletableFuture.completedStage(this::subscribe)); //CompletableFuture.completedStage(this::subscribe);
privatevoid subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
Objects.requireNonNull(subscriber, "subscriber must not be null"); if (subscriberRef.compareAndSet(null, subscriber)) {
subscriptionCF.thenAccept((s) -> {
subscriber.onSubscribe(s);
subscribedCF.complete(subscriber);
});
} else {
subscriber.onSubscribe(new Flow.Subscription() {
@Override publicvoid request(long n) { }
@Override publicvoid cancel() { }
});
subscriber.onError( new IllegalStateException("This publisher has already one subscriber"));
}
}
@Override publicvoid onNext(List<ByteBuffer> item) { assert subscriptionCF.isDone(); // cannot be called before onSubscribe()
Flow.Subscriber<? super List<ByteBuffer>> subscriber = subscriberRef.get(); assert subscriber != null; // cannot be called before subscriber calls request(1)
subscriber.onNext(item);
}
@Override publicvoid onError(Throwable throwable) { assert subscriptionCF.isDone(); // cannot be called before onSubscribe() // onError can be called before request(1), and therefore can // be called before subscriberRef is set.
subscribedCF.thenAccept(s -> s.onError(throwable));
}
@Override publicvoid onComplete() { assert subscriptionCF.isDone(); // cannot be called before onSubscribe() // onComplete can be called before request(1), and therefore can // be called before subscriberRef is set.
subscribedCF.thenAccept(s -> s.onComplete());
}
@Override public CompletionStage<Publisher<List<ByteBuffer>>> getBody() { return body;
}
}
staticfinal String WITH_BODY = "Lorem ipsum dolor sit amet, consectetur" + " adipiscing elit, sed do eiusmod tempor incididunt ut labore et" + " dolore magna aliqua. Ut enim ad minim veniam, quis nostrud" + " exercitation ullamco laboris nisi ut aliquip ex ea" + " commodo consequat. Duis aute irure dolor in reprehenderit in " + "voluptate velit esse cillum dolore eu fugiat nulla pariatur." + " Excepteur sint occaecat cupidatat non proident, sunt in culpa qui" + " officia deserunt mollit anim id est laborum.";
staticclass HTTP_FixedLengthHandler implements HttpTestHandler {
@Override publicvoid handle(HttpTestExchange t) throws IOException {
out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI()); try (InputStream is = t.getRequestBody()) {
is.readAllBytes();
} if (t.getRequestURI().getPath().endsWith("/withBody")) { byte[] bytes = WITH_BODY.getBytes(UTF_8);
t.sendResponseHeaders(200, bytes.length); // body try (OutputStream os = t.getResponseBody()) {
os.write(bytes);
}
} else {
t.sendResponseHeaders(200, 0); //no body
}
}
}
staticclass HTTP_VariableLengthHandler implements HttpTestHandler {
@Override publicvoid handle(HttpTestExchange t) throws IOException {
out.println("HTTP_VariableLengthHandler received request to " + t.getRequestURI()); try (InputStream is = t.getRequestBody()) {
is.readAllBytes();
}
t.sendResponseHeaders(200, -1); //chunked or variable if (t.getRequestURI().getPath().endsWith("/withBody")) { byte[] bytes = WITH_BODY.getBytes(UTF_8); try (OutputStream os = t.getResponseBody()) { int chunkLen = bytes.length/10; if (chunkLen == 0) {
os.write(bytes);
} else { int count = 0; for (int i=0; i<10; i++) {
os.write(bytes, count, chunkLen);
os.flush();
count += chunkLen;
}
os.write(bytes, count, bytes.length % chunkLen);
count += bytes.length % chunkLen; assert count == bytes.length;
}
}
} else {
t.getResponseBody().close(); // no body
}
}
}
}
¤ Dauer der Verarbeitung: 0.15 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.