/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
*/ package org.apache.catalina.ha.session;
/** * The DeltaManager manages replicated sessions by only replicating the deltas in data. For applications written to * handle this, the DeltaManager is the optimal way of replicating data. * <p> * This code is almost identical to StandardManager with a difference in how it persists sessions and some modifications * to it. * <p> * <b>IMPLEMENTATION NOTE </b>: Correct behavior of session storing and reloading depends upon external calls to the * <code>start()</code> and <code>stop()</code> methods of this class at the correct times. * * @author Craig R. McClanahan * @author Peter Rossbach
*/ publicclass DeltaManager extends ClusterManagerBase {
/** * @return <code>true</code> if the state transfer is complete.
*/ publicboolean getStateTransferred() { return stateTransferred;
}
/** * Set that state transferred is complete * * @param stateTransferred Flag value
*/ publicvoid setStateTransferred(boolean stateTransferred) { this.stateTransferred = stateTransferred;
}
/** * @return the sendAllSessionsWaitTime in msec
*/ publicint getSendAllSessionsWaitTime() { return sendAllSessionsWaitTime;
}
/** * @param sendAllSessionsWaitTime The sendAllSessionsWaitTime to set at msec.
*/ publicvoid setSendAllSessionsWaitTime(int sendAllSessionsWaitTime) { this.sendAllSessionsWaitTime = sendAllSessionsWaitTime;
}
/** * @param isTimestampDrop The new flag value
*/ publicvoid setStateTimestampDrop(boolean isTimestampDrop) { this.stateTimestampDrop = isTimestampDrop;
}
// --------------------------------------------------------- Public Methods
@Override public Session createSession(String sessionId) { return createSession(sessionId, true);
}
/** * Create new session with check maxActiveSessions and send session creation to other cluster nodes. * * @param sessionId The session id that should be used for the session * @param distribute <code>true</code> to replicate the new session * * @return The session
*/ public Session createSession(String sessionId, boolean distribute) {
DeltaSession session = (DeltaSession) super.createSession(sessionId); if (distribute) {
sendCreateSession(session.getId(), session);
} if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.createSession.newSession", session.getId(),
Integer.valueOf(sessions.size())));
} return session;
}
/** * Send create session event to all backup node * * @param sessionId The session id of the session * @param session The session object
*/ protectedvoid sendCreateSession(String sessionId, DeltaSession session) { if (cluster.getMembers().length > 0) {
SessionMessage msg = new SessionMessageImpl(getName(), SessionMessage.EVT_SESSION_CREATED, null, sessionId,
sessionId + "-" + System.currentTimeMillis()); if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.sendMessage.newSession", name, sessionId));
}
msg.setTimestamp(session.getCreationTime());
counterSend_EVT_SESSION_CREATED++;
send(msg);
}
}
/** * Send messages to other backup member (domain or all) * * @param msg Session message
*/ protectedvoid send(SessionMessage msg) { if (cluster != null) {
cluster.send(msg);
}
}
/** * {@inheritDoc} * <p> * Creates new DeltaSession instance.
*/
@Override public Session createEmptySession() { returnnew DeltaSession(this);
}
@Override public String rotateSessionId(Session session) { return rotateSessionId(session, true);
}
/** * serialize sessionID * * @param sessionId Session id to serialize * * @return byte array with serialized session id * * @throws IOException if an input/output error occurs
*/ protectedbyte[] serializeSessionId(String sessionId) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeUTF(sessionId);
oos.flush();
oos.close(); return bos.toByteArray();
}
/** * Load sessionID * * @param data serialized session id * * @return session id * * @throws IOException if an input/output error occurs
*/ protected String deserializeSessionId(byte[] data) throws IOException {
ReplicationStream ois = getReplicationStream(data);
String sessionId = ois.readUTF();
ois.close(); return sessionId;
}
/** * Load sessions from other cluster node. * <p> * FIXME replace currently sessions with same id without notification. * <p> * FIXME SSO handling is not really correct with the session replacement! * * @param data Serialized data * * @exception ClassNotFoundException if a serialized class cannot be found during the reload * @exception IOException if an input/output error occurs
*/ protectedvoid deserializeSessions(byte[] data) throws ClassNotFoundException, IOException {
// Open an input stream to the specified pathname, if any // Load the previously unloaded active sessions try (ObjectInputStream ois = getReplicationStream(data)) {
Integer count = (Integer) ois.readObject(); int n = count.intValue(); for (int i = 0; i < n; i++) {
DeltaSession session = (DeltaSession) createEmptySession();
session.readObjectData(ois);
session.setManager(this);
session.setValid(true);
session.setPrimarySession(false); // in case the nodes in the cluster are out of // time synch, this will make sure that we have the // correct timestamp, isValid returns true, cause // accessCount=1
session.access(); // make sure that the session gets ready to expire if // needed
session.setAccessCount(0);
session.resetDeltaRequest(); // FIXME How inform other session id cache like SingleSignOn // increment sessionCounter to correct stats report if (findSession(session.getIdInternal()) == null) {
sessionCounter++;
} else {
sessionReplaceCounter++; // FIXME better is to grap this sessions again ! if (log.isWarnEnabled()) {
log.warn(sm.getString("deltaManager.loading.existing.session", session.getIdInternal()));
}
}
add(session); if (notifySessionListenersOnReplication) {
session.tellNew();
}
}
} catch (ClassNotFoundException e) {
log.error(sm.getString("deltaManager.loading.cnfe", e), e); throw e;
} catch (IOException e) {
log.error(sm.getString("deltaManager.loading.ioe", e), e); throw e;
}
}
/** * Save any currently active sessions in the appropriate persistence mechanism, if any. If persistence is not * supported, this method returns without doing anything. * * @param currentSessions Sessions to serialize * * @return serialized data * * @exception IOException if an input/output error occurs
*/ protectedbyte[] serializeSessions(Session[] currentSessions) throws IOException {
// Open an output stream to the specified pathname, if any
ByteArrayOutputStream fos = new ByteArrayOutputStream(); try (ObjectOutputStream oos = new ObjectOutputStream(new BufferedOutputStream(fos))) {
oos.writeObject(Integer.valueOf(currentSessions.length)); for (Session currentSession : currentSessions) {
((DeltaSession) currentSession).writeObjectData(oos);
} // Flush and close the output stream
oos.flush();
} catch (IOException e) {
log.error(sm.getString("deltaManager.unloading.ioe", e), e); throw e;
}
// send object data as byte[] return fos.toByteArray();
}
/** * Start this component and implement the requirements of * {@link org.apache.catalina.util.LifecycleBase#startInternal()}. * * @exception LifecycleException if this component detects a fatal error that prevents this component from being * used
*/
@Override protectedsynchronizedvoid startInternal() throws LifecycleException {
super.startInternal();
// Load unloaded sessions, if any try { if (cluster == null) {
log.error(sm.getString("deltaManager.noCluster", getName())); return;
} else { if (log.isInfoEnabled()) {
String type = "unknown"; if (cluster.getContainer() instanceof Host) {
type = "Host";
} elseif (cluster.getContainer() instanceof Engine) {
type = "Engine";
}
log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
}
} if (log.isInfoEnabled()) {
log.info(sm.getString("deltaManager.startClustering", getName()));
}
/** * get from first session master the backup from all clustered sessions * * @see #findSessionMasterMember()
*/ publicsynchronizedvoid getAllClusterSessions() { if (cluster != null && cluster.getMembers().length > 0) { long beforeSendTime = System.currentTimeMillis();
Member mbr = findSessionMasterMember(); if (mbr == null) { // No domain member found return;
}
SessionMessage msg = new SessionMessageImpl(this.getName(), SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL", "GET-ALL-" + getName());
msg.setTimestamp(beforeSendTime); // set reference time
stateTransferCreateSendTime = beforeSendTime; // request session state
counterSend_EVT_GET_ALL_SESSIONS++;
stateTransferred = false; // FIXME This send call block the deploy thread, when sender waitForAck is enabled try { synchronized (receivedMessageQueue) {
receiverQueue = true;
}
cluster.send(msg, mbr, Channel.SEND_OPTIONS_ASYNCHRONOUS); if (log.isInfoEnabled()) {
log.info(sm.getString("deltaManager.waitForSessionState", getName(), mbr,
Integer.valueOf(getStateTransferTimeout())));
} // FIXME At sender ack mode this method check only the state // transfer and resend is a problem!
waitForSendAllSessions(beforeSendTime);
} finally { synchronized (receivedMessageQueue) { for (SessionMessage smsg : receivedMessageQueue) { if (!stateTimestampDrop) {
messageReceived(smsg, smsg.getAddress());
} else { if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS &&
smsg.getTimestamp() >= stateTransferCreateSendTime) { // FIXME handle EVT_GET_ALL_SESSIONS later
messageReceived(smsg, smsg.getAddress());
} else { if (log.isWarnEnabled()) {
log.warn(sm.getString("deltaManager.dropMessage", getName(),
smsg.getEventTypeString(), new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp())));
}
}
}
}
receivedMessageQueue.clear();
receiverQueue = false;
}
}
} else { if (log.isInfoEnabled()) {
log.info(sm.getString("deltaManager.noMembers", getName()));
}
}
}
/** * Find the master of the session state * * @return master member of sessions
*/ protected Member findSessionMasterMember() {
Member mbr = null;
Member mbrs[] = cluster.getMembers(); if (mbrs.length != 0) {
mbr = mbrs[0];
} if (mbr == null && log.isWarnEnabled()) {
log.warn(sm.getString("deltaManager.noMasterMember", getName(), ""));
} if (mbr != null && log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.foundMasterMember", getName(), mbr));
} return mbr;
}
/** * Wait that cluster session state is transferred or timeout after 60 Sec With stateTransferTimeout == -1 wait that * backup is transferred (forever mode) * * @param beforeSendTime Start instant of the operation
*/ protectedvoid waitForSendAllSessions(long beforeSendTime) { long reqStart = System.currentTimeMillis(); long reqNow = reqStart; boolean isTimeout = false; if (getStateTransferTimeout() > 0) { // wait that state is transferred with timeout check do { try { Thread.sleep(100);
} catch (Exception sleep) { //
}
reqNow = System.currentTimeMillis();
isTimeout = ((reqNow - reqStart) > (1000L * getStateTransferTimeout()));
} while ((!getStateTransferred()) && (!isTimeout) && (!isNoContextManagerReceived()));
} else { if (getStateTransferTimeout() == -1) { // wait that state is transferred do { try { Thread.sleep(100);
} catch (Exception sleep) {
}
} while ((!getStateTransferred()) && (!isNoContextManagerReceived()));
reqNow = System.currentTimeMillis();
}
} if (isTimeout) {
counterNoStateTransferred++;
log.error(sm.getString("deltaManager.noSessionState", getName(), new Date(beforeSendTime), Long.valueOf(reqNow - beforeSendTime)));
} elseif (isNoContextManagerReceived()) { if (log.isWarnEnabled()) {
log.warn(sm.getString("deltaManager.noContextManager", getName(), new Date(beforeSendTime), Long.valueOf(reqNow - beforeSendTime)));
}
} else { if (log.isInfoEnabled()) {
log.info(sm.getString("deltaManager.sessionReceived", getName(), new Date(beforeSendTime), Long.valueOf(reqNow - beforeSendTime)));
}
}
}
/** * Stop this component and implement the requirements of * {@link org.apache.catalina.util.LifecycleBase#stopInternal()}. * * @exception LifecycleException if this component detects a fatal error that prevents this component from being * used
*/
@Override protectedsynchronizedvoid stopInternal() throws LifecycleException {
if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.stopped", getName()));
}
setState(LifecycleState.STOPPING);
// Expire all active sessions if (log.isInfoEnabled()) {
log.info(sm.getString("deltaManager.expireSessions", getName()));
}
Session sessions[] = findSessions(); for (Session value : sessions) {
DeltaSession session = (DeltaSession) value; if (!session.isValid()) { continue;
} try {
session.expire(true, isExpireSessionsOnShutdown());
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
}
}
// Require a new random number generator if we are restarted super.stopInternal();
}
/** * A message was received from another node, this is the callback method to implement if you are interested in * receiving replication messages. * * @param cmsg - the message received.
*/
@Override publicvoid messageDataReceived(ClusterMessage cmsg) { if (cmsg instanceof SessionMessage) {
SessionMessage msg = (SessionMessage) cmsg; switch (msg.getEventType()) { case SessionMessage.EVT_GET_ALL_SESSIONS: case SessionMessage.EVT_SESSION_CREATED: case SessionMessage.EVT_SESSION_EXPIRED: case SessionMessage.EVT_SESSION_ACCESSED: case SessionMessage.EVT_SESSION_DELTA: case SessionMessage.EVT_CHANGE_SESSION_ID: synchronized (receivedMessageQueue) { if (receiverQueue) {
receivedMessageQueue.add(msg); return;
}
} break; default: // we didn't queue, do nothing break;
} // switch
messageReceived(msg, msg.getAddress());
}
}
/** * When the request has been completed, the replication valve will notify the manager, and the manager will decide * whether any replication is needed or not. If there is a need for replication, the manager will create a session * message and that will be replicated. The cluster determines where it gets sent. * * @param sessionId - the sessionId that just completed. * * @return a SessionMessage to be sent,
*/
@Override public ClusterMessage requestCompleted(String sessionId) { return requestCompleted(sessionId, false);
}
/** * When the request has been completed, the replication valve will notify the manager, and the manager will decide * whether any replication is needed or not. If there is a need for replication, the manager will create a session * message and that will be replicated. The cluster determines where it gets sent. Session expiration also calls * this method, but with expires == true. * * @param sessionId - the sessionId that just completed. * @param expires - whether this method has been called during session expiration * * @return a SessionMessage to be sent,
*/ public ClusterMessage requestCompleted(String sessionId, boolean expires) {
DeltaSession session = null;
SessionMessage msg = null; try {
session = (DeltaSession) findSession(sessionId); if (session == null) { // A parallel request has called session.invalidate() which has // removed the session from the Manager. returnnull;
} if (session.isDirty()) {
counterSend_EVT_SESSION_DELTA++;
msg = new SessionMessageImpl(getName(), SessionMessage.EVT_SESSION_DELTA, session.getDiff(), sessionId,
sessionId + "-" + System.currentTimeMillis());
}
} catch (IOException x) {
log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest", sessionId), x); returnnull;
} if (msg == null) { if (!expires && !session.isPrimarySession()) {
counterSend_EVT_SESSION_ACCESSED++;
msg = new SessionMessageImpl(getName(), SessionMessage.EVT_SESSION_ACCESSED, null, sessionId,
sessionId + "-" + System.currentTimeMillis()); if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary", getName(), sessionId));
}
}
} else { // log only outside synch block! if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.createMessage.delta", getName(), sessionId));
}
} if (!expires) {
session.setPrimarySession(true);
} // check to see if we need to send out an access message if (!expires && (msg == null)) { long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated(); if (session.getMaxInactiveInterval() >= 0 && replDelta > (session.getMaxInactiveInterval() * 1000L)) {
counterSend_EVT_SESSION_ACCESSED++;
msg = new SessionMessageImpl(getName(), SessionMessage.EVT_SESSION_ACCESSED, null, sessionId,
sessionId + "-" + System.currentTimeMillis()); if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.createMessage.access", getName(), sessionId));
}
}
}
// update last replicated time if (msg != null) {
session.setLastTimeReplicated(System.currentTimeMillis());
msg.setTimestamp(session.getLastTimeReplicated());
} return msg;
}
/** * This method is called by the received thread when a SessionMessage has been received from one of the other nodes * in the cluster. * * @param msg - the message received * @param sender - the sender of the message, this is used if we receive a EVT_GET_ALL_SESSION message, so that we * only reply to the requesting node
*/ protectedvoid messageReceived(SessionMessage msg, Member sender) { Thread currentThread = Thread.currentThread();
ClassLoader contextLoader = currentThread.getContextClassLoader(); try {
/** * handle receive session is access at other node ( primary session is now false) * * @param msg Session message * @param sender Member which sent the message * * @throws IOException Propagated IO error
*/ protectedvoid handleSESSION_ACCESSED(SessionMessage msg, Member sender) throws IOException {
counterReceive_EVT_SESSION_ACCESSED++;
DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session != null) { if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.receiveMessage.accessed", getName(), msg.getSessionID()));
}
session.access();
session.setPrimarySession(false);
session.endAccess();
}
}
/** * handle receive session is expire at other node ( expire session also here) * * @param msg Session message * @param sender Member which sent the message * * @throws IOException Propagated IO error
*/ protectedvoid handleSESSION_EXPIRED(SessionMessage msg, Member sender) throws IOException {
counterReceive_EVT_SESSION_EXPIRED++;
DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session != null) { if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.receiveMessage.expired", getName(), msg.getSessionID()));
}
session.expire(notifySessionListenersOnReplication, false);
}
}
/** * handle receive new session is created at other node (create backup - primary false) * * @param msg Session message * @param sender Member which sent the message
*/ protectedvoid handleSESSION_CREATED(SessionMessage msg, Member sender) {
counterReceive_EVT_SESSION_CREATED++; if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.receiveMessage.createNewSession", getName(), msg.getSessionID()));
}
DeltaSession session = (DeltaSession) createEmptySession();
session.setValid(true);
session.setPrimarySession(false);
session.setCreationTime(msg.getTimestamp()); // use container maxInactiveInterval so that session will expire correctly // in case of primary transfer
session.setMaxInactiveInterval(getContext().getSessionTimeout() * 60, false);
session.access();
session.setId(msg.getSessionID(), notifySessionListenersOnReplication);
session.endAccess();
}
/** * handle receive sessions from other not ( restart ) * * @param msg Session message * @param sender Member which sent the message * * @throws ClassNotFoundException Serialization error * @throws IOException IO error with serialization
*/ protectedvoid handleALL_SESSION_DATA(SessionMessage msg, Member sender) throws ClassNotFoundException, IOException {
counterReceive_EVT_ALL_SESSION_DATA++; if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin", getName()));
} byte[] data = msg.getSession();
deserializeSessions(data); if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter", getName()));
} // stateTransferred = true;
}
/** * Handle a get all sessions message from another node. Depending on {@link #sendAllSessions}, sessions are either * sent in a single message or in batches. Sending is complete when this method exits. * * @param msg Session message * @param sender Member which sent the message * * @throws IOException IO error sending messages
*/ protectedvoid handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException {
counterReceive_EVT_GET_ALL_SESSIONS++; // get a list of all the session from this manager if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));
} // Write the number of active sessions, followed by the details // get all sessions and serialize without sync
Session[] currentSessions = findSessions(); long findSessionTimestamp = System.currentTimeMillis(); if (isSendAllSessions()) {
sendSessions(sender, currentSessions, findSessionTimestamp);
} else { // send sessions in batches int remain = currentSessions.length; for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) { int len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i :
getSendAllSessionsSize();
Session[] sendSessions = new Session[len];
System.arraycopy(currentSessions, i, sendSessions, 0, len);
sendSessions(sender, sendSessions, findSessionTimestamp);
remain = remain - len; if (getSendAllSessionsWaitTime() > 0 && remain > 0) { try { Thread.sleep(getSendAllSessionsWaitTime());
} catch (Exception sleep) {
}
}
}
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.