/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
*/ package org.apache.tomcat.websocket;
privatefinal Log log = LogFactory.getLog(WsSession.class); // must not be static privatestaticfinal StringManager sm = StringManager.getManager(WsSession.class);
// An ellipsis is a single character that looks like three periods in a row // and is used to indicate a continuation. privatestaticfinalbyte[] ELLIPSIS_BYTES = "\u2026".getBytes(StandardCharsets.UTF_8); // An ellipsis is three bytes in UTF-8 privatestaticfinalint ELLIPSIS_BYTES_LEN = ELLIPSIS_BYTES.length;
static { // Use fake end point and path. They are never used, they just need to // be sufficient to pass the validation tests.
ServerEndpointConfig.Builder builder = ServerEndpointConfig.Builder.create(Object.class, "/");
ServerEndpointConfig sec = builder.build();
SEC_CONFIGURATOR_USES_IMPL_DEFAULT = sec.getConfigurator().getClass()
.equals(DefaultServerEndpointConfigurator.class);
}
// Expected to handle message types of <String> only privatevolatile MessageHandler textMessageHandler = null; // Expected to handle message types of <ByteBuffer> only privatevolatile MessageHandler binaryMessageHandler = null; privatevolatile MessageHandler.Whole<PongMessage> pongMessageHandler = null; private AtomicReference<State> state = new AtomicReference<>(State.OPEN); privatefinal Map<String, Object> userProperties = new ConcurrentHashMap<>(); privatevolatileint maxBinaryMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE; privatevolatileint maxTextMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE; privatevolatilelong maxIdleTimeout = 0; privatevolatilelong lastActiveRead = System.currentTimeMillis(); privatevolatilelong lastActiveWrite = System.currentTimeMillis(); private Map<FutureToSendHandler, FutureToSendHandler> futures = new ConcurrentHashMap<>();
/** * Creates a new WebSocket session for communication between the provided client and remote end points. The result * of {@link Thread#getContextClassLoader()} at the time this constructor is called will be used when calling * {@link Endpoint#onClose(Session, CloseReason)}. * * @param clientEndpointHolder The end point managed by this code * @param wsRemoteEndpoint The other / remote end point * @param wsWebSocketContainer The container that created this session * @param negotiatedExtensions The agreed extensions to use for this session * @param subProtocol The agreed sub-protocol to use for this session * @param pathParameters The path parameters associated with the request that initiated this session or * <code>null</code> if this is a client session * @param secure Was this session initiated over a secure connection? * @param clientEndpointConfig The configuration information for the client end point * * @throws DeploymentException if an invalid encode is specified
*/ public WsSession(ClientEndpointHolder clientEndpointHolder, WsRemoteEndpointImplBase wsRemoteEndpoint,
WsWebSocketContainer wsWebSocketContainer, List<Extension> negotiatedExtensions, String subProtocol,
Map<String, String> pathParameters, boolean secure, ClientEndpointConfig clientEndpointConfig) throws DeploymentException { this.wsRemoteEndpoint = wsRemoteEndpoint; this.wsRemoteEndpoint.setSession(this); this.remoteEndpointAsync = new WsRemoteEndpointAsync(wsRemoteEndpoint); this.remoteEndpointBasic = new WsRemoteEndpointBasic(wsRemoteEndpoint); this.webSocketContainer = wsWebSocketContainer;
applicationClassLoader = Thread.currentThread().getContextClassLoader();
wsRemoteEndpoint.setSendTimeout(wsWebSocketContainer.getDefaultAsyncSendTimeout()); this.maxBinaryMessageBufferSize = webSocketContainer.getDefaultMaxBinaryMessageBufferSize(); this.maxTextMessageBufferSize = webSocketContainer.getDefaultMaxTextMessageBufferSize(); this.maxIdleTimeout = webSocketContainer.getDefaultMaxSessionIdleTimeout(); this.requestUri = null; this.requestParameterMap = Collections.emptyMap(); this.queryString = null; this.userPrincipal = null; this.httpSessionId = null; this.negotiatedExtensions = negotiatedExtensions; if (subProtocol == null) { this.subProtocol = "";
} else { this.subProtocol = subProtocol;
} this.pathParameters = pathParameters; this.secure = secure; this.wsRemoteEndpoint.setEncoders(clientEndpointConfig); this.endpointConfig = clientEndpointConfig;
if (log.isDebugEnabled()) {
log.debug(sm.getString("wsSession.created", id));
}
}
/** * Creates a new WebSocket session for communication between the provided server and remote end points. The result * of {@link Thread#getContextClassLoader()} at the time this constructor is called will be used when calling * {@link Endpoint#onClose(Session, CloseReason)}. * * @param wsRemoteEndpoint The other / remote end point * @param wsWebSocketContainer The container that created this session * @param requestUri The URI used to connect to this end point or <code>null</code> if this is a client * session * @param requestParameterMap The parameters associated with the request that initiated this session or * <code>null</code> if this is a client session * @param queryString The query string associated with the request that initiated this session or * <code>null</code> if this is a client session * @param userPrincipal The principal associated with the request that initiated this session or * <code>null</code> if this is a client session * @param httpSessionId The HTTP session ID associated with the request that initiated this session or * <code>null</code> if this is a client session * @param negotiatedExtensions The agreed extensions to use for this session * @param subProtocol The agreed sub-protocol to use for this session * @param pathParameters The path parameters associated with the request that initiated this session or * <code>null</code> if this is a client session * @param secure Was this session initiated over a secure connection? * @param serverEndpointConfig The configuration information for the server end point * * @throws DeploymentException if an invalid encode is specified
*/ public WsSession(WsRemoteEndpointImplBase wsRemoteEndpoint, WsWebSocketContainer wsWebSocketContainer,
URI requestUri, Map<String, List<String>> requestParameterMap, String queryString, Principal userPrincipal,
String httpSessionId, List<Extension> negotiatedExtensions, String subProtocol,
Map<String, String> pathParameters, boolean secure, ServerEndpointConfig serverEndpointConfig) throws DeploymentException {
// Message handlers that require decoders may map to text messages, // binary messages, both or neither.
// The frame processing code expects binary message handlers to // accept ByteBuffer
// Use the POJO message handler wrappers as they are designed to wrap // arbitrary objects with MessageHandlers and can wrap MessageHandlers // just as easily.
if (!removed) { // ISE for now. Could swallow this silently / log this if the ISE // becomes a problem thrownew IllegalStateException(sm.getString("wsSession.removeHandlerFailed", listener));
}
}
@Override public String getProtocolVersion() {
checkState(); return Constants.WS_VERSION_HEADER_VALUE;
}
@Override public String getNegotiatedSubprotocol() {
checkState(); return subProtocol;
}
@Override public List<Extension> getNegotiatedExtensions() {
checkState(); return negotiatedExtensions;
}
/** * WebSocket 1.0. Section 2.1.5. Need internal close method as spec requires that the local endpoint receives a 1006 * on timeout. * * @param closeReasonMessage The close reason to pass to the remote endpoint * @param closeReasonLocal The close reason to pass to the local endpoint
*/ publicvoid doClose(CloseReason closeReasonMessage, CloseReason closeReasonLocal) {
doClose(closeReasonMessage, closeReasonLocal, false);
}
/** * WebSocket 1.0. Section 2.1.5. Need internal close method as spec requires that the local endpoint receives a 1006 * on timeout. * * @param closeReasonMessage The close reason to pass to the remote endpoint * @param closeReasonLocal The close reason to pass to the local endpoint * @param closeSocket Should the socket be closed immediately rather than waiting for the server to respond
*/ publicvoid doClose(CloseReason closeReasonMessage, CloseReason closeReasonLocal, boolean closeSocket) {
if (!state.compareAndSet(State.OPEN, State.OUTPUT_CLOSING)) { // Close process has already been started. Don't start it again. return;
}
if (log.isDebugEnabled()) {
log.debug(sm.getString("wsSession.doClose", id));
}
// Send the close message to the remote endpoint.
sendCloseMessage(closeReasonMessage);
fireEndpointOnClose(closeReasonLocal); if (!state.compareAndSet(State.OUTPUT_CLOSING, State.OUTPUT_CLOSED) || closeSocket) { /* * A close message was received in another thread or this is handling an error condition. Either way, no * further close message is expected to be received. Mark the session as fully closed...
*/
state.set(State.CLOSED); // ... and close the network connection.
wsRemoteEndpoint.close();
}
// Fail any uncompleted messages.
IOException ioe = new IOException(sm.getString("wsSession.messageFailed"));
SendResult sr = new SendResult(ioe); for (FutureToSendHandler f2sh : futures.keySet()) {
f2sh.onResult(sr);
}
}
/** * Called when a close message is received. Should only ever happen once. Also called after a protocol error when * the ProtocolHandler needs to force the closing of the connection. * * @param closeReason The reason contained within the received close message.
*/ publicvoid onClose(CloseReason closeReason) { if (state.compareAndSet(State.OPEN, State.CLOSING)) { // Standard close.
// Send the close message response to the remote endpoint.
sendCloseMessage(closeReason);
fireEndpointOnClose(closeReason);
// Mark the session as fully closed.
state.set(State.CLOSED);
// Close the network connection.
wsRemoteEndpoint.close();
} elseif (state.compareAndSet(State.OUTPUT_CLOSING, State.CLOSING)) { /* * The local endpoint sent a close message the the same time as the remote endpoint. The local close is * still being processed. Update the state so the the local close process will also close the network * connection once it has finished sending a close message.
*/
} elseif (state.compareAndSet(State.OUTPUT_CLOSED, State.CLOSED)) { /* * The local endpoint sent the first close message. The remote endpoint has now responded with its own close * message so mark the session as fully closed and close the network connection.
*/
wsRemoteEndpoint.close();
} // CLOSING and CLOSED are NO-OPs
}
// Fire the onError event Thread t = Thread.currentThread();
ClassLoader cl = t.getContextClassLoader();
t.setContextClassLoader(applicationClassLoader); try {
localEndpoint.onError(this, throwable);
} finally {
t.setContextClassLoader(cl);
}
}
privatevoid sendCloseMessage(CloseReason closeReason) { // 125 is maximum size for the payload of a control message
ByteBuffer msg = ByteBuffer.allocate(125);
CloseCode closeCode = closeReason.getCloseCode(); // CLOSED_ABNORMALLY should not be put on the wire if (closeCode == CloseCodes.CLOSED_ABNORMALLY) { // PROTOCOL_ERROR is probably better than GOING_AWAY here
msg.putShort((short) CloseCodes.PROTOCOL_ERROR.getCode());
} else {
msg.putShort((short) closeCode.getCode());
}
String reason = closeReason.getReasonPhrase(); if (reason != null && reason.length() > 0) {
appendCloseReasonWithTruncation(msg, reason);
}
msg.flip(); try {
wsRemoteEndpoint.sendMessageBlock(Constants.OPCODE_CLOSE, msg, true);
} catch (IOException | IllegalStateException e) { // Failed to send close message. Close the socket and let the caller // deal with the Exception if (log.isDebugEnabled()) {
log.debug(sm.getString("wsSession.sendCloseFail", id), e);
}
wsRemoteEndpoint.close(); // Failure to send a close message is not unexpected in the case of // an abnormal closure (usually triggered by a failure to read/write // from/to the client. In this case do not trigger the endpoint's // error handling if (closeCode != CloseCodes.CLOSED_ABNORMALLY) {
localEndpoint.onError(this, e);
}
} finally {
webSocketContainer.unregisterSession(getSessionMapKey(), this);
}
}
/** * Use protected so unit tests can access this method directly. * * @param msg The message * @param reason The reason
*/ protectedstaticvoid appendCloseReasonWithTruncation(ByteBuffer msg, String reason) { // Once the close code has been added there are a maximum of 123 bytes // left for the reason phrase. If it is truncated then care needs to be // taken to ensure the bytes are not truncated in the middle of a // multi-byte UTF-8 character. byte[] reasonBytes = reason.getBytes(StandardCharsets.UTF_8);
if (reasonBytes.length <= 123) { // No need to truncate
msg.put(reasonBytes);
} else { // Need to truncate int remaining = 123 - ELLIPSIS_BYTES_LEN; int pos = 0; byte[] bytesNext = reason.substring(pos, pos + 1).getBytes(StandardCharsets.UTF_8); while (remaining >= bytesNext.length) {
msg.put(bytesNext);
remaining -= bytesNext.length;
pos++;
bytesNext = reason.substring(pos, pos + 1).getBytes(StandardCharsets.UTF_8);
}
msg.put(ELLIPSIS_BYTES);
}
}
/** * Make the session aware of a {@link FutureToSendHandler} that will need to be forcibly closed if the session * closes before the {@link FutureToSendHandler} completes. * * @param f2sh The handler
*/ protectedvoid registerFuture(FutureToSendHandler f2sh) { // Ideally, this code should sync on stateLock so that the correct // action is taken based on the current state of the connection. // However, a sync on stateLock can't be used here as it will create the // possibility of a dead-lock. See BZ 61183. // Therefore, a slightly less efficient approach is used.
// Always register the future.
futures.put(f2sh, f2sh);
if (isOpen()) { // The session is open. The future has been registered with the open // session. Normal processing continues. return;
}
// The session is closing / closed. The future may or may not have been registered // in time for it to be processed during session closure.
if (f2sh.isDone()) { // The future has completed. It is not known if the future was // completed normally by the I/O layer or in error by doClose(). It // doesn't matter which. There is nothing more to do here. return;
}
// The session is closing / closed. The Future had not completed when last checked. // There is a small timing window that means the Future may have been // completed since the last check. There is also the possibility that // the Future was not registered in time to be cleaned up during session // close. // Attempt to complete the Future with an error result as this ensures // that the Future completes and any client code waiting on it does not // hang. It is slightly inefficient since the Future may have been // completed in another thread or another thread may be about to // complete the Future but knowing if this is the case requires the sync // on stateLock (see above). // Note: If multiple attempts are made to complete the Future, the // second and subsequent attempts are ignored.
IOException ioe = new IOException(sm.getString("wsSession.messageFailed"));
SendResult sr = new SendResult(ioe);
f2sh.onResult(sr);
}
/** * Remove a {@link FutureToSendHandler} from the set of tracked instances. * * @param f2sh The handler
*/ protectedvoid unregisterFuture(FutureToSendHandler f2sh) {
futures.remove(f2sh);
}
@Override public URI getRequestURI() {
checkState(); return requestUri;
}
@Override public Map<String, List<String>> getRequestParameterMap() {
checkState(); return requestParameterMap;
}
@Override public String getQueryString() {
checkState(); return queryString;
}
@Override public Principal getUserPrincipal() {
checkState(); return userPrincipal;
}
@Override public Map<String, String> getPathParameters() {
checkState(); return pathParameters;
}
@Override public String getId() { return id;
}
@Override public Map<String, Object> getUserProperties() {
checkState(); return userProperties;
}
public Endpoint getLocal() { return localEndpoint;
}
public String getHttpSessionId() { return httpSessionId;
}
protectedvoid checkExpiration() { // Local copies to ensure consistent behaviour during method execution long timeout = maxIdleTimeout; long timeoutRead = getMaxIdleTimeoutRead(); long timeoutWrite = getMaxIdleTimeoutWrite();
long currentTime = System.currentTimeMillis();
String key = null;
privatevoid checkState() { if (isClosed()) { /* * As per RFC 6455, a WebSocket connection is considered to be closed once a peer has sent and received a * WebSocket close frame.
*/ thrownew IllegalStateException(sm.getString("wsSession.closed", id));
}
}
privateenum State {
OPEN,
OUTPUT_CLOSING,
OUTPUT_CLOSED,
CLOSING,
CLOSED
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.