diff --git a/.gitignore b/.gitignore index 14c34b0..7d67421 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,8 @@ target /.project /ManagementTest_sctp.xml /sctptest.log + +# Ignore AppNGIN-generated build.xml and ivy.xml +build.xml +ivy.xml + diff --git a/pom.xml b/pom.xml old mode 100755 new mode 100644 diff --git a/sctp-api/src/main/java/org/mobicents/protocols/api/Management.java b/sctp-api/src/main/java/org/mobicents/protocols/api/Management.java index 560adf2..07154c7 100644 --- a/sctp-api/src/main/java/org/mobicents/protocols/api/Management.java +++ b/sctp-api/src/main/java/org/mobicents/protocols/api/Management.java @@ -274,6 +274,32 @@ public Server addServer(String serverName, String hostAddress, int port, IpChann public Association addServerAssociation(String peerAddress, int peerPort, String serverName, String assocName, IpChannelType ipChannelType) throws Exception; + /** + * Add server Association. IP channel type is SCTP. + * + * @param peerAddress + * the peer IP address that this association will accept + * connection from + * @param peerPort + * the peer port that this association will accept connection + * from + * @param serverName + * the Server that this association belongs to + * @param assocName + * unique name of Association + * @param ipChannelType + * IP channel type: SCTP or TCP + * + * @param extraHostAddresses + * When SCTP multi-homing configuration extra IP addresses can be put here + * If multi-homing absence this parameter can be null + * * @return + * @throws Exception + */ + public Association addServerAssociation(String peerAddress, int peerPort, String serverName, String assocName, IpChannelType ipChannelType, String[] extraHostAddresses) + throws Exception; + + /** * Add Association. IP channel type is SCTP. * diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/AssociationImpl.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/AssociationImpl.java index c1c6166..0e77629 100644 --- a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/AssociationImpl.java +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/AssociationImpl.java @@ -182,6 +182,30 @@ public AssociationImpl(String peerAddress, int peerPort, String serverName, Stri this.type = AssociationType.SERVER; } + + /** + * Creating a SERVER Association + * + * @param peerAddress + * @param peerPort + * @param serverName + * @param assocName + * @param ipChannelType + * @param extraHostAddresses + */ + public AssociationImpl(String peerAddress, int peerPort, String serverName, String assocName, + IpChannelType ipChannelType, String[] extraHostAddresses) { + this(); + this.peerAddress = peerAddress; + this.peerPort = peerPort; + this.serverName = serverName; + this.name = assocName; + this.ipChannelType = ipChannelType; + this.extraHostAddresses = extraHostAddresses; + + this.type = AssociationType.SERVER; + + } /** * Creating an ANONYMOUS_SERVER Association diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/ManagementImpl.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/ManagementImpl.java index 4f065e9..53bd067 100644 --- a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/ManagementImpl.java +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/ManagementImpl.java @@ -59,6 +59,7 @@ public class ManagementImpl implements Management { private static final Logger logger = Logger.getLogger(ManagementImpl.class); + private static final String DISABLE_CONFIG_PERSISTANCE_KEY = "ss7.disableDefaultConfigPersistance"; private static final String SCTP_PERSIST_DIR_KEY = "sctp.persist.dir"; private static final String USER_DIR_KEY = "user.dir"; private static final String PERSIST_FILE_NAME = "sctp.xml"; @@ -393,8 +394,16 @@ public boolean isStarted(){ return this.started; } + private boolean isConfigPersistanceDisabled() { + String disableConfigPersistanceString = System.getProperty(DISABLE_CONFIG_PERSISTANCE_KEY, "false"); + return Boolean.valueOf(disableConfigPersistanceString); + } + @SuppressWarnings("unchecked") public void load() throws FileNotFoundException { + if (isConfigPersistanceDisabled()) { + return; + } XMLObjectReader reader = null; try { reader = XMLObjectReader.newInstance(new FileInputStream(persistFile.toString())); @@ -438,6 +447,9 @@ public void load() throws FileNotFoundException { } public void store() { + if (isConfigPersistanceDisabled()) { + return; + } try { XMLObjectWriter writer = XMLObjectWriter.newInstance(new FileOutputStream(persistFile.toString())); writer.setBinding(binding); @@ -683,6 +695,8 @@ public AssociationImpl addServerAssociation(String peerAddress, int peerPort, St public AssociationImpl addServerAssociation(String peerAddress, int peerPort, String serverName, String assocName, IpChannelType ipChannelType) throws Exception { + return (AssociationImpl)addServerAssociation(peerAddress, peerPort, serverName, assocName, ipChannelType, null); + /* if (!this.started) { throw new Exception(String.format("Management=%s not started", this.name)); @@ -763,6 +777,95 @@ public AssociationImpl addServerAssociation(String peerAddress, int peerPort, St logger.info(String.format("Added Associoation=%s of type=%s", association.getName(), association.getAssociationType())); } + return association; + } + */ + } + + @Override + public Association addServerAssociation(String peerAddress, int peerPort, String serverName, String assocName, + IpChannelType ipChannelType, String[] extraHostAddresses) throws Exception { + // MADE BY KB + + if (!this.started) { + throw new Exception(String.format("Management=%s not started", this.name)); + } + + if (peerAddress == null) { + throw new Exception("Peer address cannot be null"); + } + + if (peerPort < 1) { + throw new Exception("Peer port cannot be less than 1"); + } + + if (serverName == null) { + throw new Exception("Server name cannot be null"); + } + + if (assocName == null) { + throw new Exception("Association name cannot be null"); + } + + synchronized (this) { + if (this.associations.get(assocName) != null) { + throw new Exception(String.format("Already has association=%s", assocName)); + } + + Server server = null; + + for (FastList.Node n = this.servers.head(), end = this.servers.tail(); (n = n.getNext()) != end;) { + Server serverTemp = n.getValue(); + if (serverTemp.getName().equals(serverName)) { + server = serverTemp; + } + } + + if (server == null) { + throw new Exception(String.format("No Server found for name=%s", serverName)); + } + + for (FastMap.Entry n = this.associations.head(), end = this.associations.tail(); (n = n.getNext()) != end;) { + Association associationTemp = n.getValue(); + + if (peerAddress.equals(associationTemp.getPeerAddress()) && associationTemp.getPeerPort() == peerPort) { + throw new Exception(String.format("Already has association=%s with same peer address=%s and port=%d", associationTemp.getName(), + peerAddress, peerPort)); + } + } + + if (server.getIpChannelType() != ipChannelType) + throw new Exception(String.format("Server and Accociation has different IP channel type")); + + AssociationImpl association = new AssociationImpl(peerAddress, peerPort, serverName, assocName, ipChannelType, extraHostAddresses); + association.setManagement(this); + + AssociationMap newAssociations = new AssociationMap(); + newAssociations.putAll(this.associations); + newAssociations.put(assocName, association); + this.associations = newAssociations; + // this.associations.put(assocName, association); + + FastList newAssociations2 = new FastList(); + newAssociations2.addAll(((ServerImpl) server).associations); + newAssociations2.add(assocName); + ((ServerImpl) server).associations = newAssociations2; + // ((ServerImpl) server).associations.add(assocName); + + this.store(); + + for (ManagementEventListener lstr : managementEventListeners) { + try { + lstr.onAssociationAdded(association); + } catch (Throwable ee) { + logger.error("Exception while invoking onAssociationAdded", ee); + } + } + + if (logger.isInfoEnabled()) { + logger.info(String.format("Added Associoation=%s of type=%s", association.getName(), association.getAssociationType())); + } + return association; } } @@ -845,6 +948,26 @@ public AssociationImpl addAssociation(String hostAddress, int hostPort, String p } } + public Association getAssociation(String peerAddress, int peerPort) throws Exception { + if (peerAddress == null || peerPort == 0) { + throw new Exception("PeerIp is null or peerPort is zero!"); + } + Association associationToFind = null; + for (FastMap.Entry n = associations.head(), end = associations.tail(); (n = n.getNext()) != end;) { + Association associationTemp = n.getValue(); + if (associationTemp.getPeerAddress().equals(peerAddress) && associationTemp.getPeerPort() == peerPort) { + associationToFind = associationTemp; + } + } + + + if (associationToFind == null) { + throw new Exception(String.format("No Association found based on PeerAddress=%s and PeerPort=%2", peerAddress, peerPort)); + } + return associationToFind; + } + + public Association getAssociation(String assocName) throws Exception { if (assocName == null) { throw new Exception("Association name cannot be null"); diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/SelectorThread.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/SelectorThread.java index a4af7ca..189c9e9 100644 --- a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/SelectorThread.java +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/SelectorThread.java @@ -222,8 +222,10 @@ private void doAccept(AbstractSelectableChannel serverSocketChannel, AbstractSel // server selection for (Server srv : this.management.servers) { ServerImpl srvv = (ServerImpl) srv; + logger.debug(String.format("*** Server: %s", srvv)); if (srvv.getIpChannel() == serverSocketChannel) { // we have found a server for (SocketAddress sockAdd : peerAddresses) { + logger.debug(String.format("****** Socket Address: %s", sockAdd)); inetAddress = ((InetSocketAddress) sockAdd).getAddress(); port = ((InetSocketAddress) sockAdd).getPort(); @@ -239,7 +241,7 @@ private void doAccept(AbstractSelectableChannel serverSocketChannel, AbstractSel for (FastMap.Entry n = associations.head(), end = associations.tail(); (n = n.getNext()) != end && !provisioned;) { AssociationImpl association = (AssociationImpl)n.getValue(); - + logger.debug(String.format("********* Association: %s", association)); // check if an association binds to the found server if (srv.getName().equals(association.getServerName())) { @@ -318,6 +320,28 @@ private void doAccept(AbstractSelectableChannel serverSocketChannel, AbstractSel } return; } + // changes begin + // get anonymAssociation created + try { + AssociationImpl tmpAssociation = null; + String assocByName = anonymAssociation.getPeerAddress()+":"+anonymAssociation.getPeerPort(); + try { + tmpAssociation = (AssociationImpl)this.management.getAssociation(assocByName); + } catch (Exception ex) { + logger.debug(String.format("Could not find association based on name: %s. We will try to find based on address and port...", assocByName)); + } + if (tmpAssociation == null) { + tmpAssociation = (AssociationImpl)this.management.getAssociation(anonymAssociation.getPeerAddress(), anonymAssociation.getPeerPort()); + } + if (tmpAssociation != null) { + tmpAssociation.setSocketChannel(socketChannel); + tmpAssociation.setManagement(this.management); + anonymAssociation = tmpAssociation; + } + } catch (Exception e) { + logger.error(String.format("Rejected anonymous %s", anonymAssociation), e); + } + // changes - end if (!anonymAssociation.isStarted()) { // connection is rejected @@ -338,7 +362,8 @@ private void doAccept(AbstractSelectableChannel serverSocketChannel, AbstractSel if (logger.isInfoEnabled()) { logger.info(String.format("Accepted anonymous %s", anonymAssociation)); } - + + if (anonymAssociation.getIpChannelType() == IpChannelType.TCP) { AssocChangeEvent ace = AssocChangeEvent.COMM_UP; AssociationChangeNotification2 acn = new AssociationChangeNotification2(ace); diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/ServerImpl.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/ServerImpl.java index 54c0390..2d23e11 100644 --- a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/ServerImpl.java +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/ServerImpl.java @@ -40,6 +40,7 @@ import org.mobicents.protocols.api.Server; import com.sun.nio.sctp.SctpServerChannel; +import com.sun.nio.sctp.SctpStandardSocketOptions; /** * @author amit bhayani @@ -177,6 +178,8 @@ private void doInitSocketSctp() throws IOException { // Create a new non-blocking server socket channel this.serverChannelSctp = SctpServerChannel.open(); this.serverChannelSctp.configureBlocking(false); + //KB: + this.serverChannelSctp.setOption(SctpStandardSocketOptions.SCTP_INIT_MAXSTREAMS, SctpStandardSocketOptions.InitMaxStreams.create(17, 17)); // Bind the server socket to the specified address and port InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.hostport); diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/ManageableAssociation.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/ManageableAssociation.java new file mode 100644 index 0000000..97477d4 --- /dev/null +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/ManageableAssociation.java @@ -0,0 +1,349 @@ +package org.mobicents.protocols.sctp.multiclient; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.spi.AbstractSelectableChannel; + +import javolution.xml.XMLFormat; +import javolution.xml.stream.XMLStreamException; + +import org.apache.log4j.Logger; +import org.mobicents.protocols.api.Association; +import org.mobicents.protocols.api.PayloadData; + +/** + * Abstract super class for associations. It represents an SCTP association with an interface which provides management + * functionality like: start, stop, reconnect. It also defines static classes to describe associations: PeerAddressInfo, + * HostAddressInfo, AssociationInfo used by other classes to identify and compare association objects. + * + * @author balogh.gabor@alerant.hu + * + */ + +public abstract class ManageableAssociation implements Association { + protected static final Logger logger = Logger.getLogger(ManageableAssociation.class.getName()); + + private static final String NAME = "name"; + private static final String SERVER_NAME = "serverName"; + private static final String HOST_ADDRESS = "hostAddress"; + private static final String HOST_PORT = "hostPort"; + + private static final String PEER_ADDRESS = "peerAddress"; + private static final String SECONDARY_PEER_ADDRESS = "secondaryPeerAddress"; + private static final String PEER_PORT = "peerPort"; + + private static final String EXTRA_HOST_ADDRESS = "extraHostAddress"; + private static final String EXTRA_HOST_ADDRESS_SIZE = "extraHostAddresseSize"; + + protected MultiManagementImpl management; + protected String hostAddress; + protected int hostPort; + protected String peerAddress; + protected int peerPort; + protected String name; + protected String[] extraHostAddresses; + protected String secondaryPeerAddress; + protected AssociationInfo assocInfo; + + /** + * If association can't start it tries to send INIT to the secondary peer address. + * It alternates between the two peer addresses until the connection is established. + */ + protected SocketAddress initSocketAddress; + protected SocketAddress primaryPeerSocketAddress; + protected SocketAddress secondaryPeerSocketAddress; + + // payload data used in the first dummy message which initiate the connect procedure + protected PayloadData initPayloadData = new PayloadData(0, new byte[1], true, false, 0, 0); + + /** + * This is used only for SCTP This is the socket address. If the Association has multihome support and if peer address + * changes, this variable is set to new value so new messages are now sent to changed peer address + */ + protected volatile SocketAddress peerSocketAddress = null; + + protected abstract void start() throws Exception; + + protected abstract void stop() throws Exception; + + protected abstract AbstractSelectableChannel getSocketChannel(); + + protected abstract void close(); + + protected abstract void reconnect(); + + protected abstract boolean writePayload(PayloadData payloadData, boolean initMsg); + + protected abstract void readPayload(PayloadData payloadData); + + protected ManageableAssociation() { + + } + + protected ManageableAssociation(String hostAddress, int hostPort, String peerAddress, int peerPort, String assocName, + String[] extraHostAddresses) throws IOException { + this(hostAddress, hostPort, peerAddress, peerPort, assocName, extraHostAddresses, null); + } + + protected ManageableAssociation(String hostAddress, int hostPort, String peerAddress, int peerPort, String assocName, + String[] extraHostAddresses, String secondaryPeerAddress) throws IOException { + this.hostAddress = hostAddress; + this.hostPort = hostPort; + this.peerAddress = peerAddress; + this.peerPort = peerPort; + this.name = assocName; + this.extraHostAddresses = extraHostAddresses; + this.secondaryPeerAddress = secondaryPeerAddress; + initDerivedFields(); + } + + protected void initDerivedFields() throws IOException { + this.primaryPeerSocketAddress = new InetSocketAddress(InetAddress.getByName(peerAddress), peerPort); + this.peerSocketAddress = this.primaryPeerSocketAddress; + this.initSocketAddress = this.primaryPeerSocketAddress; + if (secondaryPeerAddress != null && !secondaryPeerAddress.isEmpty()) { + this.secondaryPeerSocketAddress = new InetSocketAddress(InetAddress.getByName(secondaryPeerAddress), peerPort); + } else { + this.secondaryPeerAddress = null; + } + String secondaryHostAddress = null; + if (extraHostAddresses != null && extraHostAddresses.length >= 1) { + secondaryHostAddress = extraHostAddresses[0]; + } + this.assocInfo = new AssociationInfo(new PeerAddressInfo(primaryPeerSocketAddress, secondaryPeerSocketAddress), + new HostAddressInfo(hostAddress, secondaryHostAddress, hostPort)); + } + + protected void setManagement(MultiManagementImpl management) { + this.management = management; + } + + protected AssociationInfo getAssocInfo() { + return assocInfo; + } + + protected void setAssocInfo(AssociationInfo assocInfo) { + this.assocInfo = assocInfo; + } + + protected void assignSctpAssociationId(int id) { + this.assocInfo.getPeerInfo().setSctpAssocId(id); + } + + protected boolean isConnectedToPeerAddresses(String peerAddresses) { + PeerAddressInfo peer = getAssocInfo().getPeerInfo(); + if (peerAddresses.contains(peer.getPeerSocketAddress().toString())) { + return true; + } else if (peer.getSecondaryPeerSocketAddress() != null) { + if (peerAddresses.contains(peer.getSecondaryPeerSocketAddress().toString())) { + return true; + } + } + return false; + } + + protected PayloadData getInitPayloadData() { + return initPayloadData; + } + + protected void setInitPayloadData(PayloadData initPayloadData) { + this.initPayloadData = initPayloadData; + } + + void switchInitSocketAddress() { + if (logger.isDebugEnabled()) { + logger.debug("switchInitSocketAddress() - enter: initSocketAddress=" + this.initSocketAddress + ", secondaryPeerSocketAddress=" + secondaryPeerSocketAddress); + } + if (this.secondaryPeerSocketAddress != null) { + this.initSocketAddress = (this.initSocketAddress == this.secondaryPeerSocketAddress ? this.primaryPeerSocketAddress : this.secondaryPeerSocketAddress); + } + } + + static class PeerAddressInfo { + protected SocketAddress peerSocketAddress; + protected SocketAddress secondaryPeerSocketAddress; + protected int sctpAssocId; + + public PeerAddressInfo(SocketAddress peerSocketAddress, SocketAddress secondaryPeerSocketAddress) { + super(); + this.peerSocketAddress = peerSocketAddress; + this.secondaryPeerSocketAddress = secondaryPeerSocketAddress; + } + + public SocketAddress getPeerSocketAddress() { + return peerSocketAddress; + } + + public SocketAddress getSecondaryPeerSocketAddress() { + return secondaryPeerSocketAddress; + } + + public int getSctpAssocId() { + return sctpAssocId; + } + + protected void setPeerSocketAddress(SocketAddress peerSocketAddress) { + this.peerSocketAddress = peerSocketAddress; + } + + protected void setSecondaryPeerSocketAddress(SocketAddress secondaryPeerSocketAddress) { + this.secondaryPeerSocketAddress = secondaryPeerSocketAddress; + } + + protected void setSctpAssocId(int sctpAssocId) { + this.sctpAssocId = sctpAssocId; + } + + @Override + public String toString() { + return "PeerAddressInfo [peerSocketAddress=" + peerSocketAddress + ", secondaryPeerSocketAddress=" + + secondaryPeerSocketAddress + ", sctpAssocId=" + sctpAssocId + "]"; + } + } + + static class HostAddressInfo { + private final String primaryHostAddress; + private final String secondaryHostAddress; + private final int hostPort; + + public HostAddressInfo(String primaryHostAddress, String secondaryHostAddress, int hostPort) { + super(); + if (primaryHostAddress == null || primaryHostAddress.isEmpty()) { + throw new IllegalArgumentException("Constructor HostAddressInfo: primaryHostAddress can not be null!"); + } + this.primaryHostAddress = primaryHostAddress; + this.secondaryHostAddress = secondaryHostAddress; + this.hostPort = hostPort; + } + + public String getPrimaryHostAddress() { + return primaryHostAddress; + } + + public String getSecondaryHostAddress() { + return secondaryHostAddress; + } + + public int getHostPort() { + return hostPort; + } + + public boolean matches(HostAddressInfo hostAddressInfo) { + if (hostAddressInfo == null) { + return false; + } + if (this.hostPort != hostAddressInfo.getHostPort()) { + return false; + } + if (this.equals(hostAddressInfo)) { + return true; + } + if (this.getPrimaryHostAddress().equals(hostAddressInfo.getPrimaryHostAddress()) + || this.getPrimaryHostAddress().equals(hostAddressInfo.getSecondaryHostAddress())) { + return true; + } + if (this.getSecondaryHostAddress() != null && !this.getSecondaryHostAddress().isEmpty()) { + if (this.getSecondaryHostAddress().equals(hostAddressInfo.getPrimaryHostAddress()) + || this.getSecondaryHostAddress().equals(hostAddressInfo.getSecondaryHostAddress())) { + return true; + } + } + return false; + } + + @Override + public String toString() { + return "HostAddressInfo [primaryHostAddress=" + primaryHostAddress + ", secondaryHostAddress=" + + secondaryHostAddress + ", hostPort=" + hostPort + "]"; + } + + } + + static class AssociationInfo { + protected PeerAddressInfo peerInfo; + protected HostAddressInfo hostInfo; + + public PeerAddressInfo getPeerInfo() { + return peerInfo; + } + + public HostAddressInfo getHostInfo() { + return hostInfo; + } + + @Override + public String toString() { + return "AssociationInfo [peerInfo=" + peerInfo + ", hostInfo=" + hostInfo + "]"; + } + + public AssociationInfo(PeerAddressInfo peerInfo, HostAddressInfo hostInfo) { + super(); + this.peerInfo = peerInfo; + this.hostInfo = hostInfo; + } + + protected void setPeerInfo(PeerAddressInfo peerInfo) { + this.peerInfo = peerInfo; + } + + protected void setHostInfo(HostAddressInfo hostInfo) { + this.hostInfo = hostInfo; + } + } + + /** + * XML Serialization/Deserialization + */ + protected static final XMLFormat ASSOCIATION_XML = new XMLFormat( + ManageableAssociation.class) { + + @Override + public void read(javolution.xml.XMLFormat.InputElement xml, ManageableAssociation association) + throws XMLStreamException { + association.name = xml.getAttribute(NAME, ""); + association.hostAddress = xml.getAttribute(HOST_ADDRESS, ""); + association.hostPort = xml.getAttribute(HOST_PORT, 0); + + association.peerAddress = xml.getAttribute(PEER_ADDRESS, ""); + association.peerPort = xml.getAttribute(PEER_PORT, 0); + association.secondaryPeerAddress = xml.getAttribute(SECONDARY_PEER_ADDRESS, null); + + int extraHostAddressesSize = xml.getAttribute(EXTRA_HOST_ADDRESS_SIZE, 0); + association.extraHostAddresses = new String[extraHostAddressesSize]; + + for (int i = 0; i < extraHostAddressesSize; i++) { + association.extraHostAddresses[i] = xml.get(EXTRA_HOST_ADDRESS, String.class); + } + + } + + @Override + public void write(ManageableAssociation association, javolution.xml.XMLFormat.OutputElement xml) + throws XMLStreamException { + xml.setAttribute(NAME, association.name); + // xml.setAttribute(ASSOCIATION_TYPE, association.type.getType()); + xml.setAttribute(HOST_ADDRESS, association.hostAddress); + xml.setAttribute(HOST_PORT, association.hostPort); + + xml.setAttribute(PEER_ADDRESS, association.peerAddress); + xml.setAttribute(PEER_PORT, association.peerPort); + + if (association.secondaryPeerAddress != null && !association.secondaryPeerAddress.isEmpty()) { + xml.setAttribute(SECONDARY_PEER_ADDRESS, association.secondaryPeerAddress); + } + + xml.setAttribute(SERVER_NAME, ""); + // xml.setAttribute(IPCHANNEL_TYPE, IpChannelType.SCTP); + + xml.setAttribute(EXTRA_HOST_ADDRESS_SIZE, + association.extraHostAddresses != null ? association.extraHostAddresses.length : 0); + if (association.extraHostAddresses != null) { + for (String s : association.extraHostAddresses) { + xml.add(s, EXTRA_HOST_ADDRESS, String.class); + } + } + } + }; +} diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiAssociationHandler.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiAssociationHandler.java new file mode 100644 index 0000000..fbfe19f --- /dev/null +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiAssociationHandler.java @@ -0,0 +1,152 @@ +/* + * JBoss, Home of Professional Open Source + * Copyright 2011, Red Hat, Inc. and/or its affiliates, and individual + * contributors as indicated by the @authors tag. All rights reserved. + * See the copyright.txt in the distribution for a full listing + * of individual contributors. + * + * This copyrighted material is made available to anyone wishing to use, + * modify, copy, or redistribute it subject to the terms and conditions + * of the GNU General Public License, v. 2.0. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License, + * v. 2.0 along with this distribution; if not, write to the Free + * Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + * MA 02110-1301, USA. + */ +package org.mobicents.protocols.sctp.multiclient; + +import org.apache.log4j.Logger; + +import com.sun.nio.sctp.AbstractNotificationHandler; +import com.sun.nio.sctp.AssociationChangeNotification; +import com.sun.nio.sctp.HandlerResult; +import com.sun.nio.sctp.Notification; +import com.sun.nio.sctp.PeerAddressChangeNotification; +import com.sun.nio.sctp.SendFailedNotification; +import com.sun.nio.sctp.ShutdownNotification; + +/** + * Implements NotificationHandler for the OneToManyAssocMultiplexer class. Its main responsibility is to delegate notifications + * to the NotificationHandler of the corresponding OneToManyAssociationImpl. + * + * @author balogh.gabor@alerant.hu + * + */ +@SuppressWarnings("restriction") +class MultiAssociationHandler extends AbstractNotificationHandler { + + private static final Logger logger = Logger.getLogger(MultiAssociationHandler.class); + + public MultiAssociationHandler() { + + } + + /** + * The delegateNotificationHandling method resolves the OneToManyAssociationImpl instance which belongs to the given + * Notification and calls the handleNotification method of the resolved Association. In case the association instance can + * not be resolved the method returns the value of the defaultResult parameter. + * + * @param not - Notification that need to be delegated + * @param defaultResult - Default return value used when Association instance cannot be resolved + * @param multiplexer - The OneToManyAssocMultiplexer that is attached to this MultiAssociationHandler instance + * @return - If delegation is successful it returns the return result of the handler method of the corresponding + * OneToManyAssociationImpl instance otherwise ű returns the value of the defaultResult parameter. + */ + private HandlerResult delegateNotificationHandling(Notification not, HandlerResult defaultResult, + OneToManyAssocMultiplexer multiplexer) { + ManageableAssociation mAssociation = multiplexer.resolveAssociationImpl(not.association()); + + if (mAssociation == null) { + return defaultResult; + } + if (mAssociation instanceof OneToManyAssociationImpl) { + + OneToManyAssociationImpl association = (OneToManyAssociationImpl) mAssociation; + if (logger.isDebugEnabled()) { + logger.debug("Notification: " + not + " is being delagated to associationHandler: " + + association.associationHandler); + } + return association.associationHandler.handleNotification(not, association); + } else if (mAssociation instanceof OneToOneAssociationImpl) { + OneToOneAssociationImpl association = (OneToOneAssociationImpl) mAssociation; + if (logger.isDebugEnabled()) { + logger.debug("Notification: " + not + " is being delagated to associationHandler: " + + association.associationHandler); + } + return association.associationHandler.handleNotification(not, association); + } + logger.warn(String.format("Unexpected super type: %s", mAssociation.getClass())); + return defaultResult; + } + + @Override + public HandlerResult handleNotification(AssociationChangeNotification not, OneToManyAssocMultiplexer multiplexer) { + if (logger.isDebugEnabled()) { + logger.debug("handleNotification(AssociationChangeNotification=" + not + ", OneToManyAssocMultiplexer=" + + multiplexer + ") is called"); + } + + if (not.association() == null) { + logger.error("Cannot handle AssociationChangeNotification: association method of AssociationChangeNotification: " + + not + " returns null value, handler returns CONTINUE"); + return HandlerResult.CONTINUE; + } + return delegateNotificationHandling(not, HandlerResult.CONTINUE, multiplexer); + } + + @Override + public HandlerResult handleNotification(ShutdownNotification not, OneToManyAssocMultiplexer multiplexer) { + if (logger.isDebugEnabled()) { + logger.debug("handleNotification(ShutdownNotification not, OneToManyAssocMultiplexer multiplexer) is called"); + } + if (not.association() == null) { + logger.error("Cannot handle ShutdownNotification: assoction method of ShutdownNotification: " + not + + " returns null value, handler returns RETURN"); + return HandlerResult.RETURN; + } + return delegateNotificationHandling(not, HandlerResult.RETURN, multiplexer); + } + + @Override + public HandlerResult handleNotification(SendFailedNotification notification, OneToManyAssocMultiplexer multiplexer) { + if (logger.isDebugEnabled()) { + logger.debug( + "handleNotification(SendFailedNotification notification, OneToManyAssocMultiplexer multiplexer) is called not=" + notification + " multiplexer=" + multiplexer); + } + ManageableAssociation assoc = multiplexer.findPendingAssociationByAddress(notification.address()); + if (assoc == null) { + logger.warn("Cannot handle sendfailed notification: no pending manageable association found for address=" + + notification.address() + " by the multiplexer"); + return HandlerResult.RETURN; + } + // delegate notification + if (assoc instanceof OneToManyAssociationImpl) { + OneToManyAssociationImpl oneToManyAssoc = (OneToManyAssociationImpl) assoc; + return oneToManyAssoc.associationHandler.handleNotification(notification, oneToManyAssoc); + } else if (assoc instanceof OneToOneAssociationImpl) { + OneToOneAssociationImpl oneToOneAssoc = (OneToOneAssociationImpl) assoc; + return oneToOneAssoc.associationHandler.handleNotification(notification, oneToOneAssoc); + } + return HandlerResult.RETURN; + } + + @Override + public HandlerResult handleNotification(PeerAddressChangeNotification notification, OneToManyAssocMultiplexer multiplexer) { + if (logger.isDebugEnabled()) { + logger.debug( + "handleNotification(PeerAddressChangeNotification notification, OneToManyAssocMultiplexer multiplexer) is called"); + } + if (notification.association() == null) { + logger.error("Cannot handle PeerAddressChangeNotification: assoction method of PeerAddressChangeNotification: " + + notification + " returns null value, handler returns RETURN"); + return HandlerResult.RETURN; + } + return delegateNotificationHandling(notification, HandlerResult.RETURN, multiplexer); + } +} diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiChangeRequest.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiChangeRequest.java new file mode 100644 index 0000000..0c3c633 --- /dev/null +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiChangeRequest.java @@ -0,0 +1,140 @@ +/* + * JBoss, Home of Professional Open Source + * Copyright 2011, Red Hat, Inc. and/or its affiliates, and individual + * contributors as indicated by the @authors tag. All rights reserved. + * See the copyright.txt in the distribution for a full listing + * of individual contributors. + * + * This copyrighted material is made available to anyone wishing to use, + * modify, copy, or redistribute it subject to the terms and conditions + * of the GNU General Public License, v. 2.0. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License, + * v. 2.0 along with this distribution; if not, write to the Free + * Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + * MA 02110-1301, USA. + */ +package org.mobicents.protocols.sctp.multiclient; + +import java.nio.channels.spi.AbstractSelectableChannel; + +/** + * @author amit bhayani + * @author balogh.gabor@alerant.hu + * + */ +public final class MultiChangeRequest { + public static final int REGISTER = 1; + public static final int CHANGEOPS = 2; + public static final int CONNECT = 3; + public static final int CLOSE = 4; + public static final int ADD_OPS = 5; + + private final int type; + private final int ops; + private final AbstractSelectableChannel socketChannel; + private final OneToManyAssocMultiplexer assocMultiplexer; + private final ManageableAssociation association; + + private final boolean multiAssocRequest; + + private long executionTime; + + protected MultiChangeRequest(AbstractSelectableChannel socketChannel, OneToManyAssocMultiplexer assocMultiplexer, + ManageableAssociation association, int type, int ops) { + if (assocMultiplexer != null && association != null) { + throw new IllegalArgumentException( + "MultiChangeRequest can not be instatiated because of ambiougos arguments: both assocMultiplexer and association are specified!"); + } + if (assocMultiplexer == null && association == null) { + throw new IllegalArgumentException( + "MultiChangeRequest can not be instatiated because of ambiougos arguments: nor assocMultiplexer nor association are specified!"); + } + this.type = type; + this.ops = ops; + + if (assocMultiplexer != null) { + this.assocMultiplexer = assocMultiplexer; + this.association = null; + this.multiAssocRequest = true; + if (socketChannel == null) { + this.socketChannel = assocMultiplexer.getSocketMultiChannel(); + } else { + this.socketChannel = socketChannel; + } + } else { + this.association = association; + this.assocMultiplexer = null; + this.multiAssocRequest = false; + if (socketChannel == null) { + this.socketChannel = association.getSocketChannel(); + } else { + this.socketChannel = socketChannel; + } + } + } + + protected MultiChangeRequest(OneToManyAssocMultiplexer assocMultiplexer, ManageableAssociation association, int type, + long executionTime) { + this(null, assocMultiplexer, association, type, -1); + this.executionTime = executionTime; + } + + /** + * @return the type + */ + protected int getType() { + return type; + } + + /** + * @return the ops + */ + protected int getOps() { + return ops; + } + + /** + * @return the socketChannel + */ + protected AbstractSelectableChannel getSocketChannel() { + return socketChannel; + } + + /** + * @return the one-to-many multiplexer instance + */ + protected OneToManyAssocMultiplexer getAssocMultiplexer() { + return assocMultiplexer; + } + + /** + * @return the one-to-one association + */ + protected ManageableAssociation getAssociation() { + return association; + } + + protected boolean isMultiAssocRequest() { + return multiAssocRequest; + } + + /** + * @return the executionTime + */ + protected long getExecutionTime() { + return executionTime; + } + + @Override + public String toString() { + return "MultiChangeRequest [type=" + type + ", ops=" + ops + ", socketChannel=" + socketChannel + ", assocMultiplexer=" + + assocMultiplexer + ", oneToOneAssoc=" + association + ", multiAssocRequest=" + multiAssocRequest + + ", executionTime=" + executionTime + "]"; + } +} diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiChannelController.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiChannelController.java new file mode 100644 index 0000000..fe0b503 --- /dev/null +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiChannelController.java @@ -0,0 +1,101 @@ +package org.mobicents.protocols.sctp.multiclient; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.log4j.Logger; + +/** + * Stores and manages OneToManyAssocMultiplexer and ManageableAssociation objects of a SCTP stack (MultiManagementImpl + * instance). + * + * @author balogh.gabor@alerant.hu + * + */ +public class MultiChannelController { + private static final Logger logger = Logger.getLogger(MultiChannelController.class); + + private MultiManagementImpl management; + + private final HashMap> multiplexers = new HashMap>(); + + public MultiChannelController(MultiManagementImpl management) { + super(); + this.management = management; + } + + private synchronized OneToManyAssocMultiplexer findMultiplexerByHostAddrInfo(OneToManyAssociationImpl.HostAddressInfo hostAddressInfo) { + OneToManyAssocMultiplexer ret = null; + if (logger.isDebugEnabled()) { + logger.debug("Number of multiplexers: " + multiplexers.size()); + } + ArrayList mList = multiplexers.get(hostAddressInfo.getHostPort()); + if (mList == null || mList.isEmpty()) { + if (logger.isDebugEnabled()) { + logger.debug("No multiplexers found for local port: " + hostAddressInfo.getHostPort()); + } + } else { + for (OneToManyAssocMultiplexer am : mList) { + if (am.getHostAddressInfo().matches(hostAddressInfo)) { + ret = am; + } + } + } + if (logger.isDebugEnabled()) { + logger.debug("findMultiplexerByHostAddr: " + hostAddressInfo + " returns: " + ret); + } + return ret; + } + + private synchronized void storeMultiplexer(OneToManyAssociationImpl.HostAddressInfo hostAddrInfo, + OneToManyAssocMultiplexer multiplexer) { + ArrayList mList = multiplexers.get(hostAddrInfo.getHostPort()); + if (mList == null) { + mList = new ArrayList(); + multiplexers.put(hostAddrInfo.getHostPort(), mList); + } + mList.add(multiplexer); + } + + /** + * Using the host address information of the given OneToManyAssociationImpl finds the appropriate multiplexer instance and + * register it. If the multiplexer instance does not exists it is created by the method. + * + * @param assocImpl - OneToManyAssociation instance need to be registered to the appropriate OneToManyAssocMultiplexer + * @return - the OneToManyAssocMultiplexer that is associated to the OneToManyAssociationImpl assocImpl + * @throws IOException + */ + protected synchronized OneToManyAssocMultiplexer register(ManageableAssociation assocImpl) throws IOException { + if (assocImpl == null || assocImpl.getAssocInfo() == null || assocImpl.getAssocInfo().getHostInfo() == null) { + logger.error("Unable to register association=" + assocImpl); + return null; + } + if (logger.isDebugEnabled()) { + logger.debug("register: " + assocImpl); + } + OneToManyAssocMultiplexer ret = null; + ret = findMultiplexerByHostAddrInfo(assocImpl.getAssocInfo().getHostInfo()); + if (ret == null) { + ret = new OneToManyAssocMultiplexer(assocImpl.getAssocInfo().getHostInfo(), management); + storeMultiplexer(assocImpl.getAssocInfo().getHostInfo(), ret); + } + ret.registerAssociation(assocImpl); + return ret; + } + + protected synchronized void stopAllMultiplexers() { + for (List mList : multiplexers.values()) { + for (OneToManyAssocMultiplexer multiplexer : mList) { + try { + multiplexer.stop(); + } catch (IOException e) { + logger.warn(e); + } + } + } + multiplexers.clear(); + } + +} diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiManagementImpl.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiManagementImpl.java new file mode 100644 index 0000000..8329465 --- /dev/null +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiManagementImpl.java @@ -0,0 +1,793 @@ +/* + * TeleStax, Open Source Cloud Communications Copyright 2012. + * and individual contributors + * by the @authors tag. See the copyright.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ + +package org.mobicents.protocols.sctp.multiclient; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javolution.text.TextBuilder; +import javolution.util.FastList; +import javolution.util.FastMap; +import javolution.xml.XMLObjectReader; +import javolution.xml.XMLObjectWriter; +import javolution.xml.stream.XMLStreamException; + +import org.apache.log4j.Logger; +import org.mobicents.protocols.api.Association; +import org.mobicents.protocols.api.IpChannelType; +import org.mobicents.protocols.api.Management; +import org.mobicents.protocols.api.ManagementEventListener; +import org.mobicents.protocols.api.PayloadData; +import org.mobicents.protocols.api.Server; +import org.mobicents.protocols.api.ServerListener; +import org.mobicents.protocols.sctp.AssociationMap; + +/** + * This class is a partial implementation of the Management interface of the Mobicents SCTP-API. It is partial because it does + * not support the whole functionality of the interface instead it extends the capabilities of the implementation provided by + * org.mobicents.protocols.sctp package with the capability to use One-To-Many type SCTP client associations. + * + * Therefore the following functionality is not supported by this class: server type associations TCP ipChannelType + * + * @author amit bhayani + * @author balogh.gabor@alerant.hu + */ +public class MultiManagementImpl implements Management { + + private static final Logger logger = Logger.getLogger(MultiManagementImpl.class); + + private static final String DISABLE_CONFIG_PERSISTANCE_KEY = "ss7.disableDefaultConfigPersistance"; + private static final String ENABLE_SCTP_ASSOC_BRANCHING = "sctp.enableBranching"; + private static final String SCTP_PERSIST_DIR_KEY = "sctp.persist.dir"; + private static final String USER_DIR_KEY = "user.dir"; + private static final String PERSIST_FILE_NAME = "sctp.xml"; + private static final String ASSOCIATIONS = "associations"; + + private static final String CONNECT_DELAY_PROP = "connectdelay"; + + private final TextBuilder persistFile = TextBuilder.newInstance(); + + protected final MultiSctpXMLBinding binding; + protected static final String TAB_INDENT = "\t"; + private static final String CLASS_ATTRIBUTE = "type"; + private static final AtomicInteger WORKER_POOL_INDEX = new AtomicInteger(0); + + private final String name; + + protected String persistDir = null; + + protected AssociationMap associations = new AssociationMap(); + + private FastList pendingChanges = new FastList(); + + // Create a new selector + private Selector socketSelector = null; + + private MultiSelectorThread selectorThread = null; + + static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; + + private int workerThreads = DEFAULT_IO_THREADS; + + private boolean singleThread = true; + + private int workerThreadCount = 0; + + // Maximum IO Errors tolerated by Socket. After this the Socket will be + // closed and attempt will be made to open again + private int maxIOErrors = 3; + + private int connectDelay = 5000; + + private ExecutorService[] executorServices = null; + + private FastList managementEventListeners = new FastList(); + + private volatile boolean started = false; + + private final MultiChannelController multiChannelController = new MultiChannelController(this); + + private boolean enableBranching; + + // default value of the dummy message sent to initiate the SCTP connection + private PayloadData initPayloadData = new PayloadData(0, new byte[1], true, false, 0, 0); + + public MultiManagementImpl(String name) throws IOException { + this.name = name; + String enableBranchingString = System.getProperty(ENABLE_SCTP_ASSOC_BRANCHING, "true"); + this.enableBranching = Boolean.valueOf(enableBranchingString); + this.binding = new MultiSctpXMLBinding(enableBranching); + this.binding.setClassAttribute(CLASS_ATTRIBUTE); + if (enableBranching) { + this.binding.setAlias(OneToManyAssociationImpl.class, "association"); + } else { + this.binding.setAlias(OneToOneAssociationImpl.class, "association"); + } + this.binding.setAlias(String.class, "string"); + this.socketSelector = SelectorProvider.provider().openSelector(); + + } + + /** + * @return the name + */ + public String getName() { + return name; + } + + public String getPersistDir() { + return persistDir; + } + + public void setPersistDir(String persistDir) { + this.persistDir = persistDir; + } + + /** + * @return the connectDelay + */ + public int getConnectDelay() { + return connectDelay; + } + + /** + * @param connectDelay the connectDelay to set + */ + public void setConnectDelay(int connectDelay) throws Exception { + if (!this.started) + throw new Exception("ConnectDelay parameter can be updated only when SCTP stack is running"); + + this.connectDelay = connectDelay; + + this.store(); + } + + /** + * @return the workerThreads + */ + public int getWorkerThreads() { + return workerThreads; + } + + /** + * @param workerThreads the workerThreads to set + */ + public void setWorkerThreads(int workerThreads) throws Exception { + if (this.started) + throw new Exception("WorkerThreads parameter can be updated only when SCTP stack is NOT running"); + + if (workerThreads < 1) { + workerThreads = DEFAULT_IO_THREADS; + } + this.workerThreads = workerThreads; + } + + /** + * @return the maxIOErrors + */ + public int getMaxIOErrors() { + return maxIOErrors; + } + + /** + * @param maxIOErrors the maxIOErrors to set + */ + public void setMaxIOErrors(int maxIOErrors) { + if (maxIOErrors < 1) { + maxIOErrors = 1; + } + this.maxIOErrors = maxIOErrors; + } + + /** + * @return the singleThread + */ + public boolean isSingleThread() { + return singleThread; + } + + /** + * @param singleThread the singleThread to set + */ + public void setSingleThread(boolean singleThread) throws Exception { + if (this.started) + throw new Exception("SingleThread parameter can be updated only when SCTP stack is NOT running"); + + this.singleThread = singleThread; + } + + protected FastList getManagementEventListeners() { + return managementEventListeners; + } + + public void addManagementEventListener(ManagementEventListener listener) { + synchronized (this) { + if (this.managementEventListeners.contains(listener)) + return; + + FastList newManagementEventListeners = new FastList(); + newManagementEventListeners.addAll(this.managementEventListeners); + newManagementEventListeners.add(listener); + this.managementEventListeners = newManagementEventListeners; + } + } + + public void removeManagementEventListener(ManagementEventListener listener) { + synchronized (this) { + if (!this.managementEventListeners.contains(listener)) + return; + + FastList newManagementEventListeners = new FastList(); + newManagementEventListeners.addAll(this.managementEventListeners); + newManagementEventListeners.remove(listener); + this.managementEventListeners = newManagementEventListeners; + } + } + + protected MultiChannelController getMultiChannelController() { + return multiChannelController; + } + + public boolean isInBranchingMode() { + return enableBranching; + } + + public void start() throws Exception { + + if (this.started) { + logger.warn(String.format("management=%s is already started", this.name)); + return; + } + + synchronized (this) { + this.persistFile.clear(); + + if (persistDir != null) { + this.persistFile.append(persistDir).append(File.separator).append(this.name).append("_") + .append(PERSIST_FILE_NAME); + } else { + persistFile.append(System.getProperty(SCTP_PERSIST_DIR_KEY, System.getProperty(USER_DIR_KEY))) + .append(File.separator).append(this.name).append("_").append(PERSIST_FILE_NAME); + } + + logger.info(String.format("SCTP configuration file path %s", persistFile.toString())); + + try { + this.load(); + } catch (FileNotFoundException e) { + logger.warn(String.format("Failed to load the SCTP configuration file. \n%s", e.getMessage())); + } + + if (!this.singleThread) { + // If not single thread model we create worker threads + this.executorServices = new ExecutorService[this.workerThreads]; + for (int i = 0; i < this.workerThreads; i++) { + this.executorServices[i] = Executors + .newSingleThreadExecutor(new NamingThreadFactory("SCTP-" + WORKER_POOL_INDEX.incrementAndGet())); + } + } + this.socketSelector = SelectorProvider.provider().openSelector(); + this.selectorThread = new MultiSelectorThread(this.socketSelector, this); + this.selectorThread.setStarted(true); + + (new Thread(this.selectorThread, "SCTP-selector")).start(); + + this.started = true; + + if (logger.isInfoEnabled()) { + logger.info(String.format("Started SCTP Management=%s WorkerThreads=%d SingleThread=%s", this.name, + (this.singleThread ? 0 : this.workerThreads), this.singleThread)); + } + + for (ManagementEventListener lstr : managementEventListeners) { + try { + lstr.onServiceStarted(); + } catch (Throwable ee) { + logger.error("Exception while invoking onServiceStarted", ee); + } + } + } + } + + public void stop() throws Exception { + + if (!this.started) { + logger.warn(String.format("management=%s is already stopped", this.name)); + return; + } + + for (ManagementEventListener lstr : managementEventListeners) { + try { + lstr.onServiceStopped(); + } catch (Throwable ee) { + logger.error("Exception while invoking onServiceStopped", ee); + } + } + + // We store the original state first + this.store(); + + multiChannelController.stopAllMultiplexers(); + // Stop all associations + /* + * FastMap associationsTemp = this.associations; for (FastMap.Entry n = + * associationsTemp.head(), end = associationsTemp.tail(); (n = n.getNext()) != end;) { Association associationTemp = + * n.getValue(); if (associationTemp.isStarted()) { ((OneToManyAssociationImpl) associationTemp).stop(); } } + */ + + if (this.executorServices != null) { + for (int i = 0; i < this.executorServices.length; i++) { + this.executorServices[i].shutdown(); + } + } + + this.selectorThread.setStarted(false); + this.socketSelector.wakeup(); // Wakeup selector so SelectorThread dies + + // waiting till stopping associations + for (int i1 = 0; i1 < 20; i1++) { + boolean assConnected = false; + for (FastMap.Entry n = this.associations.head(), end = this.associations + .tail(); (n = n.getNext()) != end;) { + Association associationTemp = n.getValue(); + if (associationTemp.isConnected()) { + assConnected = true; + break; + } + } + if (!assConnected) + break; + Thread.sleep(100); + } + + // Graceful shutdown for each of Executors + if (this.executorServices != null) { + for (int i = 0; i < this.executorServices.length; i++) { + if (!this.executorServices[i].isTerminated()) { + if (logger.isInfoEnabled()) { + logger.info("Waiting for worker thread to die gracefully ...."); + } + try { + this.executorServices[i].awaitTermination(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // Do we care? + } + } + } + } + + this.started = false; + } + + public boolean isStarted() { + return this.started; + } + + private boolean isConfigPersistanceDisabled() { + String disableConfigPersistanceString = System.getProperty(DISABLE_CONFIG_PERSISTANCE_KEY, "false"); + return Boolean.valueOf(disableConfigPersistanceString); + } + + @SuppressWarnings("unchecked") + public void load() throws FileNotFoundException { + if (isConfigPersistanceDisabled()) { + return; + } + XMLObjectReader reader = null; + try { + reader = XMLObjectReader.newInstance(new FileInputStream(persistFile.toString())); + reader.setBinding(binding); + + try { + this.connectDelay = reader.read(CONNECT_DELAY_PROP, Integer.class); + } catch (java.lang.NullPointerException npe) { + // ignore. + // For backward compatibility we can ignore if these values are not defined + } + + this.associations = reader.read(ASSOCIATIONS, AssociationMap.class); + + for (FastMap.Entry n = this.associations.head(), end = this.associations + .tail(); (n = n.getNext()) != end;) { + n.getValue().setManagement(this); + } + } catch (XMLStreamException ex) { + logger.error("Error while re-creating Linksets from persisted file", ex); + } + } + + public void store() { + if (isConfigPersistanceDisabled()) { + return; + } + try { + XMLObjectWriter writer = XMLObjectWriter.newInstance(new FileOutputStream(persistFile.toString())); + writer.setBinding(binding); + // Enables cross-references. + // writer.setReferenceResolver(new XMLReferenceResolver()); + writer.setIndentation(TAB_INDENT); + + writer.write(this.connectDelay, CONNECT_DELAY_PROP, Integer.class); + + writer.write(this.associations, ASSOCIATIONS, AssociationMap.class); + + writer.close(); + } catch (Exception e) { + logger.error("Error while persisting the Rule state in file", e); + } + } + + public void removeAllResourses() throws Exception { + + synchronized (this) { + if (!this.started) { + throw new Exception(String.format("Management=%s not started", this.name)); + } + + if (this.associations.size() == 0) + // no resources allocated - nothing to do + return; + + if (logger.isInfoEnabled()) { + logger.info(String.format("Removing allocated resources: Associations=%d", this.associations.size())); + } + + synchronized (this) { + + // Remove all associations + ArrayList lst = new ArrayList(); + for (FastMap.Entry n = this.associations.head(), end = this.associations + .tail(); (n = n.getNext()) != end;) { + lst.add(n.getKey()); + } + for (String n : lst) { + this.stopAssociation(n); + this.removeAssociation(n); + } + + // We store the cleared state + this.store(); + } + + for (ManagementEventListener lstr : managementEventListeners) { + try { + lstr.onRemoveAllResources(); + } catch (Throwable ee) { + logger.error("Exception while invoking onRemoveAllResources", ee); + } + } + } + } + + public ManageableAssociation addAssociation(String hostAddress, int hostPort, String peerAddress, int peerPort, + String assocName) throws Exception { + return addAssociation(hostAddress, hostPort, peerAddress, peerPort, assocName, IpChannelType.SCTP, null); + } + + public ManageableAssociation addAssociation(String hostAddress, int hostPort, String peerAddress, int peerPort, + String assocName, IpChannelType ipChannelType, String[] extraHostAddresses) throws Exception { + return addAssociation(hostAddress, hostPort, peerAddress, peerPort, assocName, IpChannelType.SCTP, extraHostAddresses, + null); + } + + public ManageableAssociation addAssociation(String hostAddress, int hostPort, String peerAddress, int peerPort, + String assocName, IpChannelType ipChannelType, String[] extraHostAddresses, String secondaryPeerAddress) + throws Exception { + if (!this.started) { + throw new Exception(String.format("Management=%s not started", this.name)); + } + + if (hostAddress == null) { + throw new Exception("Host address cannot be null"); + } + + if (hostPort < 0) { + throw new Exception("Host port cannot be less than 0"); + } + + if (peerAddress == null) { + throw new Exception("Peer address cannot be null"); + } + + if (peerPort < 1) { + throw new Exception("Peer port cannot be less than 1"); + } + + if (assocName == null) { + throw new Exception("Association name cannot be null"); + } + + synchronized (this) { + for (FastMap.Entry n = this.associations.head(), end = this.associations + .tail(); (n = n.getNext()) != end;) { + Association associationTemp = n.getValue(); + + if (assocName.equals(associationTemp.getName())) { + throw new Exception(String.format("Already has association=%s", associationTemp.getName())); + } + + } + ManageableAssociation association = null; + if (isInBranchingMode()) { + association = new OneToOneAssociationImpl(hostAddress, hostPort, peerAddress, peerPort, assocName, + extraHostAddresses, secondaryPeerAddress); + } else { + association = new OneToManyAssociationImpl(hostAddress, hostPort, peerAddress, peerPort, assocName, + extraHostAddresses, secondaryPeerAddress); + } + + association.setManagement(this); + association.setInitPayloadData(initPayloadData); + + AssociationMap newAssociations = new AssociationMap(); + newAssociations.putAll(this.associations); + newAssociations.put(assocName, association); + this.associations = newAssociations; + + this.store(); + + for (ManagementEventListener lstr : managementEventListeners) { + try { + lstr.onAssociationAdded(association); + } catch (Throwable ee) { + logger.error("Exception while invoking onAssociationAdded", ee); + } + } + + if (logger.isInfoEnabled()) { + logger.info(String.format("Added Associoation=%s of type=%s", association.getName(), + association.getAssociationType())); + } + + return association; + } + } + + public Association getAssociation(String assocName) throws Exception { + if (assocName == null) { + throw new Exception("Association name cannot be null"); + } + Association associationTemp = this.associations.get(assocName); + + if (associationTemp == null) { + throw new Exception(String.format("No Association found for name=%s", assocName)); + } + return associationTemp; + } + + /** + * @return the associations + */ + public Map getAssociations() { + Map routeTmp = new HashMap(); + routeTmp.putAll(this.associations); + return routeTmp; + } + + public void startAssociation(String assocName) throws Exception { + if (!this.started) { + throw new Exception(String.format("Management=%s not started", this.name)); + } + + if (assocName == null) { + throw new Exception("Association name cannot be null"); + } + + ManageableAssociation association = this.associations.get(assocName); + + if (association == null) { + throw new Exception(String.format("No Association found for name=%s", assocName)); + } + + if (association.isStarted()) { + throw new Exception(String.format("Association=%s is already started", assocName)); + } + + association.start(); + this.store(); + } + + public void stopAssociation(String assocName) throws Exception { + if (!this.started) { + throw new Exception(String.format("Management=%s not started", this.name)); + } + + if (assocName == null) { + throw new Exception("Association name cannot be null"); + } + + ManageableAssociation association = this.associations.get(assocName); + + if (association == null) { + throw new Exception(String.format("No Association found for name=%s", assocName)); + } + + association.stop(); + this.store(); + } + + public void removeAssociation(String assocName) throws Exception { + if (!this.started) { + throw new Exception(String.format("Management=%s not started", this.name)); + } + + if (assocName == null) { + throw new Exception("Association name cannot be null"); + } + + synchronized (this) { + Association association = this.associations.get(assocName); + + if (association == null) { + throw new Exception(String.format("No Association found for name=%s", assocName)); + } + + if (association.isStarted()) { + throw new Exception(String.format("Association name=%s is started. Stop before removing", assocName)); + } + + AssociationMap newAssociations = new AssociationMap(); + newAssociations.putAll(this.associations); + newAssociations.remove(assocName); + this.associations = newAssociations; + + this.store(); + + for (ManagementEventListener lstr : managementEventListeners) { + try { + lstr.onAssociationRemoved(association); + } catch (Throwable ee) { + logger.error("Exception while invoking onAssociationRemoved", ee); + } + } + } + } + + public PayloadData getInitPayloadData() { + return initPayloadData; + } + + public void setInitPayloadData(PayloadData initPayloadData) { + this.initPayloadData = initPayloadData; + } + + /** + * @return the pendingChanges + */ + protected FastList getPendingChanges() { + return pendingChanges; + } + + /** + * @return the socketSelector + */ + protected Selector getSocketSelector() { + return socketSelector; + } + + protected void populateWorkerThread(int workerThreadTable[]) { + for (int count = 0; count < workerThreadTable.length; count++) { + if (this.workerThreadCount == this.workerThreads) { + this.workerThreadCount = 0; + } + + workerThreadTable[count] = this.workerThreadCount; + this.workerThreadCount++; + } + } + + protected ExecutorService getExecutorService(int index) { + return this.executorServices[index]; + } + + /* unimplemented management methods */ + @Override + public Server addServer(String serverName, String hostAddress, int port) throws Exception { + throw new UnsupportedOperationException( + MultiManagementImpl.class.getName() + " does not implement server functionality!"); + } + + @Override + public Server addServer(String serverName, String hostAddress, int port, IpChannelType ipChannelType, + boolean acceptAnonymousConnections, int maxConcurrentConnectionsCount, String[] extraHostAddresses) + throws Exception { + throw new UnsupportedOperationException( + MultiManagementImpl.class.getName() + " does not implement server functionality!"); + } + + @Override + public Server addServer(String serverName, String hostAddress, int port, IpChannelType ipChannelType, + String[] extraHostAddresses) throws Exception { + throw new UnsupportedOperationException( + MultiManagementImpl.class.getName() + " does not implement server functionality!"); + } + + @Override + public Association addServerAssociation(String peerAddress, int peerPort, String serverName, String assocName) + throws Exception { + throw new UnsupportedOperationException( + MultiManagementImpl.class.getName() + " does not implement server functionality!"); + } + + @Override + public Association addServerAssociation(String peerAddress, int peerPort, String serverName, String assocName, + IpChannelType ipChannelType) throws Exception { + throw new UnsupportedOperationException( + MultiManagementImpl.class.getName() + " does not support server type associations!"); + } + + @Override + public Association addServerAssociation(String peerAddress, int peerPort, String serverName, String assocName, + IpChannelType ipChannelType, String[] extraHostAddresses) throws Exception { + throw new UnsupportedOperationException( + MultiManagementImpl.class.getName() + " does not support server type associations!"); + } + + @Override + public ServerListener getServerListener() { + return null; + } + + @Override + public List getServers() { + return Collections.emptyList(); + } + + @Override + public void removeServer(String serverName) throws Exception { + throw new UnsupportedOperationException( + MultiManagementImpl.class.getName() + " does not implement server functionality!"); + } + + @Override + public void setServerListener(ServerListener serverListener) { + throw new UnsupportedOperationException( + MultiManagementImpl.class.getName() + " does not implement server functionality!"); + } + + @Override + public void startServer(String serverName) throws Exception { + throw new UnsupportedOperationException( + MultiManagementImpl.class.getName() + " does not implement server functionality!"); + } + + @Override + public void stopServer(String serverName) throws Exception { + throw new UnsupportedOperationException( + MultiManagementImpl.class.getName() + " does not implement server functionality!"); + } +} diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiSctpXMLBinding.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiSctpXMLBinding.java new file mode 100644 index 0000000..0c219aa --- /dev/null +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiSctpXMLBinding.java @@ -0,0 +1,102 @@ +/* + * JBoss, Home of Professional Open Source + * Copyright 2011, Red Hat, Inc. and/or its affiliates, and individual + * contributors as indicated by the @authors tag. All rights reserved. + * See the copyright.txt in the distribution for a full listing + * of individual contributors. + * + * This copyrighted material is made available to anyone wishing to use, + * modify, copy, or redistribute it subject to the terms and conditions + * of the GNU General Public License, v. 2.0. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License, + * v. 2.0 along with this distribution; if not, write to the Free + * Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + * MA 02110-1301, USA. + */ +package org.mobicents.protocols.sctp.multiclient; + +import java.util.Map.Entry; + +import javolution.xml.XMLBinding; +import javolution.xml.XMLFormat; +import javolution.xml.stream.XMLStreamException; + +import org.mobicents.protocols.sctp.AssociationMap; + +/** + * @author amit bhayani + * @author balogh.gabor@alerant.hu + */ +@SuppressWarnings("serial") +public class MultiSctpXMLBinding extends XMLBinding { + + private final boolean isBranched; + + public MultiSctpXMLBinding(boolean isBranched) { + this.isBranched = isBranched; + } + + protected static final XMLFormat> ASSOCIATION_MAP_ONE_TO_ONE = new XMLFormat>( + null) { + + @Override + public void write(AssociationMap obj, javolution.xml.XMLFormat.OutputElement xml) + throws XMLStreamException { + for (Entry entry : obj.entrySet()) { + xml.add((String) entry.getKey(), "name", String.class); + xml.add((ManageableAssociation) entry.getValue(), "association", ManageableAssociation.class); + } + } + + @Override + public void read(javolution.xml.XMLFormat.InputElement xml, AssociationMap obj) + throws XMLStreamException { + while (xml.hasNext()) { + String key = xml.get("name", String.class); + OneToOneAssociationImpl association = xml.get("association", OneToOneAssociationImpl.class); + obj.put(key, association); + } + } + }; + + protected static final XMLFormat> ASSOCIATION_MAP_ONE_TO_MANY = new XMLFormat>( + null) { + + @Override + public void write(AssociationMap obj, javolution.xml.XMLFormat.OutputElement xml) + throws XMLStreamException { + for (Entry entry : obj.entrySet()) { + xml.add((String) entry.getKey(), "name", String.class); + xml.add((ManageableAssociation) entry.getValue(), "association", ManageableAssociation.class); + } + } + + @Override + public void read(javolution.xml.XMLFormat.InputElement xml, AssociationMap obj) + throws XMLStreamException { + while (xml.hasNext()) { + String key = xml.get("name", String.class); + OneToManyAssociationImpl association = null; + association = xml.get("association", OneToManyAssociationImpl.class); + obj.put(key, association); + } + } + }; + + @SuppressWarnings("rawtypes") + protected XMLFormat getFormat(Class forClass) throws XMLStreamException { + if (AssociationMap.class.equals(forClass)) { + if (isBranched) { + return ASSOCIATION_MAP_ONE_TO_ONE; + } + return ASSOCIATION_MAP_ONE_TO_MANY; + } + return super.getFormat(forClass); + } +} diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiSelectorThread.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiSelectorThread.java new file mode 100644 index 0000000..70de8ae --- /dev/null +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiSelectorThread.java @@ -0,0 +1,228 @@ +/* + * JBoss, Home of Professional Open Source + * Copyright 2011, Red Hat, Inc. and/or its affiliates, and individual + * contributors as indicated by the @authors tag. All rights reserved. + * See the copyright.txt in the distribution for a full listing + * of individual contributors. + * + * This copyrighted material is made available to anyone wishing to use, + * modify, copy, or redistribute it subject to the terms and conditions + * of the GNU General Public License, v. 2.0. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License, + * v. 2.0 along with this distribution; if not, write to the Free + * Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + * MA 02110-1301, USA. + */ + +package org.mobicents.protocols.sctp.multiclient; + +import java.io.IOException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; + +import javolution.util.FastList; + +import org.apache.log4j.Logger; + +/** + * This class controls the nio sockets and manages the I/O operations. + * + * @author amit bhayani + * @author balogh.gabo@alerant.hu + * + */ + +public class MultiSelectorThread implements Runnable { + + protected static final Logger logger = Logger.getLogger(MultiSelectorThread.class); + + protected Selector selector; + + protected MultiManagementImpl management = null; + + protected volatile boolean started = true; + + /** + * Creates the MultiSelector instance for the given MultiManagementImpl (SCTP stack) and Selector + * + * @param selector + * @param management + */ + protected MultiSelectorThread(Selector selector, MultiManagementImpl management) { + super(); + this.selector = selector; + this.management = management; + } + + /** + * @param started the started to set + */ + protected void setStarted(boolean started) { + this.started = started; + } + + @Override + public void run() { + if (logger.isInfoEnabled()) { + logger.info(String.format("SelectorThread for Management=%s started.", this.management.getName())); + } + while (this.started) { + try { + FastList pendingChanges = this.management.getPendingChanges(); + + // Process any pending changes + synchronized (pendingChanges) { + Iterator changes = pendingChanges.iterator(); + while (changes.hasNext()) { + MultiChangeRequest change = changes.next(); + + SelectionKey key = change.getSocketChannel() == null ? null + : change.getSocketChannel().keyFor(this.selector); + if (logger.isDebugEnabled()) { + if (key != null && key.isValid()) { + logger.debug( + "change=" + change + ": key=" + key + " of socketChannel=" + change.getSocketChannel() + + " for selector=" + this.selector + " key interesOps=" + key.interestOps()); + } + } + switch (change.getType()) { + case MultiChangeRequest.CHANGEOPS: + pendingChanges.remove(change); + if (key == null) { + logger.warn("change=" + change + ": key is null", + new NullPointerException("Selection key is null")); + } else if (!key.isValid()) { + logger.warn("change=" + change + ": key=" + key + " key is invalid", + new InternalError("Selection key is invalid")); + } else { + key.interestOps(change.getOps()); + } + break; + case MultiChangeRequest.ADD_OPS: + pendingChanges.remove(change); + if (key == null) { + logger.warn("change=" + change + ": key is null", + new NullPointerException("Selection key is null")); + } else if (!key.isValid()) { + logger.warn("change=" + change + ": key=" + key + " key is invalid", + new InternalError("Selection key is invalid")); + } else { + key.interestOps(key.interestOps() | change.getOps()); + } + break; + case MultiChangeRequest.REGISTER: + pendingChanges.remove(change); + + SelectionKey key1 = change.getSocketChannel().register(this.selector, change.getOps()); + + if (change.isMultiAssocRequest()) { + key1.attach(change.getAssocMultiplexer()); + if (logger.isDebugEnabled()) { + logger.debug("Key=" + key1 + "is registered to channel=" + change.getSocketChannel() + + " of the association=" + change.getAssocMultiplexer()); + } + } else { + key1.attach(change.getAssociation()); + if (logger.isDebugEnabled()) { + logger.debug("Key=" + key1 + "is registered to channel=" + change.getSocketChannel() + + " of the association=" + change.getAssociation()); + } + } + break; + case MultiChangeRequest.CONNECT: + if (!change.getAssociation().isStarted()) { + pendingChanges.remove(change); + } else { + if (change.getExecutionTime() <= System.currentTimeMillis()) { + pendingChanges.remove(change); + change.getAssociation().reconnect(); + } + } + break; + case MultiChangeRequest.CLOSE: + pendingChanges.remove(change); + if (!change.isMultiAssocRequest()) { + change.getAssociation().close(); + } + break; + } + } + } + + // Wait for an event one of the registered channels + this.selector.select(500); + + // Iterate over the set of keys for which events are available + Iterator selectedKeys = this.selector.selectedKeys().iterator(); + + while (selectedKeys.hasNext()) { + SelectionKey key = selectedKeys.next(); + selectedKeys.remove(); + + if (!key.isValid()) { + continue; + } + + // Check what event is available and deal with it + if (key.isConnectable()) { + logger.error("Illegal selectionKey state: connectable"); + } + if (key.isAcceptable()) { + logger.error("Illegal selectionKey state: acceptable"); + } + if (key.isReadable()) { + this.read(key); + } + if (key.isWritable()) { + this.write(key); + } + } + } catch (CancelledKeyException cke) { + // having this exception when closing a channel can be normal, but we log it on WARN level + logger.warn("Selecting a cancelled ready key: " + cke.getMessage()); + } catch (Exception e) { + logger.error("Error while selecting the ready keys", e); + e.printStackTrace(); + } + } + + try { + this.selector.close(); + } catch (IOException e) { + logger.error(String.format("Error while closing Selector for SCTP Management=%s", this.management.getName())); + } + + if (logger.isInfoEnabled()) { + logger.info(String.format("SelectorThread for Management=%s stopped.", this.management.getName())); + } + } + + private void read(SelectionKey key) throws IOException { + if (key.attachment() instanceof OneToManyAssocMultiplexer) { + OneToManyAssocMultiplexer multiplexer = (OneToManyAssocMultiplexer) key.attachment(); + multiplexer.read(); + } else if (key.attachment() instanceof OneToOneAssociationImpl) { + OneToOneAssociationImpl association = (OneToOneAssociationImpl) key.attachment(); + association.read(); + } + } + + private void write(SelectionKey key) throws IOException { + if (key.attachment() instanceof OneToManyAssocMultiplexer) { + OneToManyAssocMultiplexer multiplexer = (OneToManyAssocMultiplexer) key.attachment(); + multiplexer.write(key); + } else if (key.attachment() instanceof OneToOneAssociationImpl) { + OneToOneAssociationImpl association = (OneToOneAssociationImpl) key.attachment(); + association.write(key); + } + } + +} diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiWorker.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiWorker.java new file mode 100644 index 0000000..01da0fb --- /dev/null +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/MultiWorker.java @@ -0,0 +1,61 @@ +/* + * JBoss, Home of Professional Open Source + * Copyright 2011, Red Hat, Inc. and/or its affiliates, and individual + * contributors as indicated by the @authors tag. All rights reserved. + * See the copyright.txt in the distribution for a full listing + * of individual contributors. + * + * This copyrighted material is made available to anyone wishing to use, + * modify, copy, or redistribute it subject to the terms and conditions + * of the GNU General Public License, v. 2.0. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License, + * v. 2.0 along with this distribution; if not, write to the Free + * Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + * MA 02110-1301, USA. + */ +package org.mobicents.protocols.sctp.multiclient; + +import org.mobicents.protocols.api.Association; +import org.mobicents.protocols.api.AssociationListener; +import org.mobicents.protocols.api.PayloadData; + +/** + * The MultiWorker class is a runnable task which runs the onPayload callback method of the associated associationListener. + * + * @author amit bhayani + * @author balogh.gabor@alerant.hu + */ +public class MultiWorker implements Runnable { + + private Association association; + private AssociationListener associationListener = null; + private PayloadData payloadData = null; + + /** + * @param association + * @param associationListener + * @param payloadData + */ + protected MultiWorker(Association association, AssociationListener associationListener, PayloadData payloadData) { + super(); + this.association = association; + this.associationListener = associationListener; + this.payloadData = payloadData; + } + + @Override + public void run() { + try { + this.associationListener.onPayload(this.association, this.payloadData); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/NamingThreadFactory.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/NamingThreadFactory.java new file mode 100644 index 0000000..9c4e4e2 --- /dev/null +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/NamingThreadFactory.java @@ -0,0 +1,40 @@ +package org.mobicents.protocols.sctp.multiclient; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Thread factory which names threads by "pool--thread-n". This is a replacement for Executors.defaultThreadFactory() + * to be able to identify pools. Optionally a delegate thread factory can be given which creates the Thread object itself, if no + * delegate has been given, Executors.defaultThreadFactory is used. + * + * @author pocsaji.miklos@alerant.hu + */ +public class NamingThreadFactory implements ThreadFactory { + + private ThreadFactory delegate; + private String baseName; + private AtomicInteger index; + + public NamingThreadFactory(String baseName) { + this(baseName, null); + } + + public NamingThreadFactory(String baseName, ThreadFactory delegate) { + this.baseName = baseName; + this.delegate = delegate; + if (this.delegate == null) { + this.delegate = Executors.defaultThreadFactory(); + } + this.index = new AtomicInteger(1); + } + + @Override + public Thread newThread(Runnable r) { + String name = "pool-" + baseName + "-thread-" + index.getAndIncrement(); + Thread ret = delegate.newThread(r); + ret.setName(name); + return ret; + } +} diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToManyAssocMultiplexer.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToManyAssocMultiplexer.java new file mode 100644 index 0000000..534103b --- /dev/null +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToManyAssocMultiplexer.java @@ -0,0 +1,465 @@ +package org.mobicents.protocols.sctp.multiclient; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javolution.util.FastList; + +import org.apache.log4j.Logger; +import org.mobicents.protocols.api.PayloadData; +import org.mobicents.protocols.sctp.multiclient.ManageableAssociation.HostAddressInfo; + +import com.sun.nio.sctp.MessageInfo; +import com.sun.nio.sctp.SctpChannel; +import com.sun.nio.sctp.SctpMultiChannel; + +/** + * Controls the read, write and init operations of SCTP associations of a SctpMultiChannel. + * + * @author balogh.gabor@alerant.hu + */ +@SuppressWarnings("restriction") +public class OneToManyAssocMultiplexer { + private static final Logger logger = Logger.getLogger(OneToManyAssocMultiplexer.class); + + private MultiManagementImpl management; + + private HostAddressInfo hostAddressInfo; + private SctpMultiChannel socketMultiChannel; + + // The buffer into which we'll read data when it's available + private ByteBuffer rxBuffer = ByteBuffer.allocateDirect(8192); + + // Is the multiplexer been started by management? + private AtomicBoolean started = new AtomicBoolean(false); + + // Queue holds payloads to be transmitted + private ConcurrentLinkedQueueSwapper txQueueSwapper = new ConcurrentLinkedQueueSwapper( + new ConcurrentLinkedQueue()); + + private CopyOnWriteArrayList pendingAssocs = new CopyOnWriteArrayList(); + private ConcurrentHashMap connectedAssocs = new ConcurrentHashMap(); + + protected final MultiAssociationHandler associationHandler = new MultiAssociationHandler(); + + /* + * Support fast and save queue operations like: swap, conactAsHead. + * + */ + static class ConcurrentLinkedQueueSwapper { + private ReadWriteLock lock = new ReentrantReadWriteLock(); + private ConcurrentLinkedQueue queue; + + public ConcurrentLinkedQueueSwapper(ConcurrentLinkedQueue queue) { + this.queue = queue; + } + + public void add(T e) { + lock.readLock().lock(); + queue.add(e); + lock.readLock().unlock(); + } + + public boolean isEmpty() { + return queue.isEmpty(); + } + + public ConcurrentLinkedQueue swap(ConcurrentLinkedQueue newQueue) { + if (newQueue == null) { + throw new NullPointerException( + this.getClass() + ".swap(ConcurrentLinkedQueue newQueue): newQueue parameter can not be null!"); + } + ConcurrentLinkedQueue newQueueCopy = new ConcurrentLinkedQueue(newQueue); + lock.writeLock().lock(); + ConcurrentLinkedQueue oldQueue = this.queue; + this.queue = newQueueCopy; + lock.writeLock().unlock(); + return oldQueue; + } + + public void concatAsHead(ConcurrentLinkedQueue newHead) { + if (newHead == null) { + throw new NullPointerException(this.getClass() + + ".concatAsHead(ConcurrentLinkedQueue newHead): newHead parameter can not be null!"); + } + ConcurrentLinkedQueue newQueueCopy = new ConcurrentLinkedQueue(newHead); + lock.writeLock().lock(); + for (T e : this.queue) { + newQueueCopy.add(e); + } + this.queue = newQueueCopy; + lock.writeLock().unlock(); + } + + } + + public OneToManyAssocMultiplexer(HostAddressInfo hostAddressInfo, MultiManagementImpl management) throws IOException { + super(); + if (hostAddressInfo == null || management == null) { + throw new IllegalArgumentException( + "Constructor OneToManyAssocMultiplexer: hostAddressInfo and management parameters can not be null!"); + } + this.hostAddressInfo = hostAddressInfo; + this.management = management; + this.rxBuffer.clear(); + this.rxBuffer.rewind(); + this.rxBuffer.flip(); + initMultiChannel(); + started.set(true); + } + + protected void registerAssociation(ManageableAssociation association) { + if (!started.get()) { + throw new IllegalStateException("OneToManyAssocMultiplexer is stopped!"); + } + + pendingAssocs.add(association); + } + + protected void unregisterAssociation(ManageableAssociation association) { + if (!started.get()) { + throw new IllegalStateException("OneToManyAssocMultiplexer is stopped!"); + } + + if (!pendingAssocs.remove(association)) { + connectedAssocs.remove(association); + } + } + + protected void start() throws IOException { + if (!started.compareAndSet(false, true)) { + return; + } + this.rxBuffer.clear(); + this.rxBuffer.rewind(); + this.rxBuffer.flip(); + initMultiChannel(); + } + + protected void assignSctpAssocIdToAssociation(Integer id, ManageableAssociation association) { + if (!started.get()) { + throw new IllegalStateException("OneToManyAssocMultiplexer is stoped!"); + } + if (id == null || association == null) { + return; + } + connectedAssocs.put(id, association); + pendingAssocs.remove(association); + association.assignSctpAssociationId(id); + } + + protected ManageableAssociation findConnectedAssociation(Integer sctpAssocId) { + return connectedAssocs.get(sctpAssocId); + } + + private String extractPeerAddresses(com.sun.nio.sctp.Association sctpAssociation) { + String peerAddresses = ""; + try { + for (SocketAddress sa : getSocketMultiChannel().getRemoteAddresses(sctpAssociation)) { + peerAddresses += ", " + sa.toString(); + } + } catch (IOException e) { + } + return peerAddresses; + } + + protected ManageableAssociation findPendingAssociation(com.sun.nio.sctp.Association sctpAssociation) { + String peerAddresses = extractPeerAddresses(sctpAssociation); + if (logger.isDebugEnabled()) { + peerAddresses = peerAddresses.isEmpty() ? peerAddresses : peerAddresses.substring(2); + logger.debug("Association(" + sctpAssociation.associationID() + ") connected to " + peerAddresses); + } + ManageableAssociation ret = null; + for (ManageableAssociation assocImpl : pendingAssocs) { + if (assocImpl.isConnectedToPeerAddresses(peerAddresses)) { + ret = assocImpl; + break; + } + } + return ret; + } + + protected ManageableAssociation findPendingAssociationByAddress(SocketAddress address) { + String peerAddress = address.toString(); + if (logger.isDebugEnabled()) { + logger.debug("findPendingAssociationByAddress is called with address parameter=" + peerAddress); + } + ManageableAssociation ret = null; + for (ManageableAssociation assocImpl : pendingAssocs) { + if (assocImpl.isConnectedToPeerAddresses(peerAddress)) { + ret = assocImpl; + break; + } + } + return ret; + } + + private void initMultiChannel() throws IOException { + try { + socketMultiChannel = SctpMultiChannel.open(); + socketMultiChannel.configureBlocking(false); + socketMultiChannel.bind( + new InetSocketAddress(this.hostAddressInfo.getPrimaryHostAddress(), this.hostAddressInfo.getHostPort())); + if (this.hostAddressInfo.getSecondaryHostAddress() != null + && !this.hostAddressInfo.getSecondaryHostAddress().isEmpty()) { + socketMultiChannel.bindAddress(InetAddress.getByName(this.hostAddressInfo.getSecondaryHostAddress())); + } + } catch (IOException ex) { + logger.warn("Error while init multi channel ", ex); + if (socketMultiChannel != null && socketMultiChannel.isOpen()) { + try { + socketMultiChannel.close(); + } catch (IOException closeEx) { + } + ; + } + throw ex; + } + + if (logger.isDebugEnabled()) { + logger.debug("New socketMultiChanel is created: " + socketMultiChannel + " supported options: " + + socketMultiChannel.validOps() + ":" + socketMultiChannel.supportedOptions()); + } + FastList pendingChanges = this.management.getPendingChanges(); + synchronized (pendingChanges) { + pendingChanges.add(new MultiChangeRequest(this.socketMultiChannel, this, null, MultiChangeRequest.REGISTER, + SelectionKey.OP_WRITE | SelectionKey.OP_READ)); + } + } + + public HostAddressInfo getHostAddressInfo() { + return hostAddressInfo; + } + + public SctpMultiChannel getSocketMultiChannel() { + return socketMultiChannel; + } + + private ManageableAssociation getAssociationByMessageInfo(MessageInfo msgInfo) { + ManageableAssociation ret = null; + // find connected assoc + if (msgInfo.association() != null) { + ret = findConnectedAssociation(msgInfo.association().associationID()); + } + // find in pending assoc + if (ret == null) { + ret = findPendingAssociation(msgInfo.association()); + } + return ret; + } + + protected void send(PayloadData payloadData, MessageInfo messageInfo, ManageableAssociation sender) throws IOException { + send(payloadData, messageInfo, sender, false); + } + + protected void send(PayloadData payloadData, MessageInfo messageInfo, ManageableAssociation sender, boolean initMsg) throws IOException { + if (!started.get()) { + return; + } + FastList pendingChanges = this.management.getPendingChanges(); + synchronized (pendingChanges) { + + // Indicate we want the interest ops set changed + pendingChanges.add(new MultiChangeRequest(this.getSocketMultiChannel(), this, null, MultiChangeRequest.ADD_OPS, + SelectionKey.OP_WRITE)); + + this.txQueueSwapper.add(new SctpMessage(payloadData, messageInfo, sender, initMsg)); + } + + // Finally, wake up our selecting thread so it can make the required + // changes + this.management.getSocketSelector().wakeup(); + } + + protected void write(SelectionKey key) { + if (!started.get()) { + return; + } + ConcurrentLinkedQueue txQueueTmp = txQueueSwapper.swap(new ConcurrentLinkedQueue()); + + if (txQueueTmp.isEmpty()) { + // We wrote away all data, so we're no longer interested + // in writing on this socket. Switch back to waiting for + // data. + key.interestOps(SelectionKey.OP_READ); + if (logger.isDebugEnabled()) { + logger.debug("write: txQueue was empty"); + } + return; + } + + while (!txQueueTmp.isEmpty()) { + SctpMessage msg = txQueueTmp.poll(); + msg.getSenderAssoc().writePayload(msg.getPayloadData(), msg.isInitMsg()); + } + + if (txQueueTmp.isEmpty()) { + // We wrote away all data, so we're no longer interested + // in writing on this socket. Switch back to waiting for + // data. + key.interestOps(SelectionKey.OP_READ); + } + } + + private void doReadSctp() throws IOException { + + rxBuffer.clear(); + MessageInfo messageInfo = null; + messageInfo = this.socketMultiChannel.receive(rxBuffer, this, this.associationHandler); + + if (messageInfo == null) { + if (logger.isDebugEnabled()) { + logger.debug(String.format(" messageInfo is null for AssociationMultiplexer=%s", this)); + } + return; + } + + int len = messageInfo.bytes(); + if (len == -1) { + logger.error( + String.format("Rx -1 while trying to read from underlying socket for AssociationMultiplexer=%s ", this)); + return; + } + + rxBuffer.flip(); + byte[] data = new byte[len]; + rxBuffer.get(data); + rxBuffer.clear(); + + PayloadData payload = new PayloadData(len, data, messageInfo.isComplete(), messageInfo.isUnordered(), + messageInfo.payloadProtocolID(), messageInfo.streamNumber()); + + ManageableAssociation assoc = getAssociationByMessageInfo(messageInfo); + if (assoc != null) { + assoc.readPayload(payload); + } + + } + + protected void read() { + if (!started.get()) { + return; + } + try { + doReadSctp(); + } catch (IOException e) { + logger.error("Unable to read from socketMultiChannek, hostAddressInfo: " + this.hostAddressInfo, e); + } catch (Exception ex) { + logger.error( + "Unexpected exception: unnable to read from socketMultiChannek, hostAddressInfo: " + this.hostAddressInfo, + ex); + } + } + + protected ManageableAssociation resolveAssociationImpl(com.sun.nio.sctp.Association sctpAssociation) { + if (!started.get()) { + return null; + } + ManageableAssociation association = findConnectedAssociation(sctpAssociation.associationID()); + if (association == null) { + association = findPendingAssociation(sctpAssociation); + assignSctpAssocIdToAssociation(sctpAssociation.associationID(), association); + + if (management.isInBranchingMode()) { + if (logger.isInfoEnabled()) { + logger.info("Branching association: " + association.getName()); + } + try { + SctpChannel sctpChannel = getSocketMultiChannel().branch(sctpAssociation); + if (sctpChannel.isBlocking()) { + sctpChannel.configureBlocking(false); + } + + OneToOneAssociationImpl oneToOneAssoc = (OneToOneAssociationImpl) association; + oneToOneAssoc.branch(sctpChannel, management); + + if (logger.isDebugEnabled()) { + logger.debug("resolveAssociationImpl result for sctpAssocId: " + sctpAssociation.associationID() + + " is " + association); + } + return oneToOneAssoc; + } catch (Exception ex) { + logger.error(ex); + } + } + } + ; + if (logger.isDebugEnabled()) { + logger.debug( + "resolveAssociationImpl result for sctpAssocId: " + sctpAssociation.associationID() + " is " + association); + } + return association; + } + + protected void stop() throws IOException { + if (!started.compareAndSet(true, false)) { + return; + } + + for (ManageableAssociation assocImpl : connectedAssocs.values()) { + try { + assocImpl.stop(); + } catch (Exception ex) { + logger.warn(ex); + } + } + connectedAssocs.clear(); + for (ManageableAssociation assocImpl : pendingAssocs) { + try { + assocImpl.stop(); + } catch (Exception e) { + logger.warn(e); + ; + } + } + pendingAssocs.clear(); + this.socketMultiChannel.close(); + } + + static class SctpMessage { + private final PayloadData payloadData; + private final MessageInfo messageInfo; + private final ManageableAssociation senderAssoc; + private final boolean initMsg; + + protected SctpMessage(PayloadData payloadData, MessageInfo messageInfo, ManageableAssociation senderAssoc, boolean initMsg) { + super(); + this.payloadData = payloadData; + this.messageInfo = messageInfo; + this.senderAssoc = senderAssoc; + this.initMsg = initMsg; + } + + protected PayloadData getPayloadData() { + return payloadData; + } + + protected MessageInfo getMessageInfo() { + return messageInfo; + } + + protected ManageableAssociation getSenderAssoc() { + return senderAssoc; + } + + public boolean isInitMsg() { + return initMsg; + } + + @Override + public String toString() { + return "SctpMessage [payloadData=" + payloadData + ", messageInfo=" + messageInfo + ", senderAssoc=" + senderAssoc + + ", initMsg=" + initMsg + "]"; + } + } +} diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToManyAssociationHandler.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToManyAssociationHandler.java new file mode 100644 index 0000000..6d75f07 --- /dev/null +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToManyAssociationHandler.java @@ -0,0 +1,185 @@ +package org.mobicents.protocols.sctp.multiclient; + +import org.apache.log4j.Logger; +import org.apache.log4j.Priority; + +import com.sun.nio.sctp.AbstractNotificationHandler; +import com.sun.nio.sctp.AssociationChangeNotification; +import com.sun.nio.sctp.HandlerResult; +import com.sun.nio.sctp.Notification; +import com.sun.nio.sctp.PeerAddressChangeNotification; +import com.sun.nio.sctp.SendFailedNotification; +import com.sun.nio.sctp.ShutdownNotification; + +/** + * Handles notifications for OneToManyAssociationImpl objects. + * + * @author balogh.gabor@alerant.hu + * + */ +@SuppressWarnings("restriction") +public class OneToManyAssociationHandler extends AbstractNotificationHandler { + + private static final Logger logger = Logger.getLogger(OneToManyAssociationHandler.class); + + private volatile int maxInboundStreams = 1; + private volatile int maxOutboundStreams = 1; + + public OneToManyAssociationHandler() { + + } + + /** + * @return the maxInboundStreams + */ + public int getMaxInboundStreams() { + return maxInboundStreams; + } + + /** + * @return the maxOutboundStreams + */ + public int getMaxOutboundStreams() { + return maxOutboundStreams; + } + + @Override + public HandlerResult handleNotification(Notification arg0, OneToManyAssociationImpl arg1) { + if (arg0 instanceof AssociationChangeNotification) { + return handleNotification((AssociationChangeNotification) arg0, arg1); + } + if (arg0 instanceof ShutdownNotification) { + return handleNotification((ShutdownNotification) arg0, arg1); + } + if (arg0 instanceof SendFailedNotification) { + return handleNotification((SendFailedNotification) arg0, arg1); + } + if (arg0 instanceof PeerAddressChangeNotification) { + return handleNotification((PeerAddressChangeNotification) arg0, arg1); + } + return super.handleNotification(arg0, arg1); + } + + @Override + public HandlerResult handleNotification(AssociationChangeNotification not, OneToManyAssociationImpl association) { + + switch (not.event()) { + case COMM_UP: + // in case when comm is go online but the association has been already stopped COMM_UP event is sinked. + if (!association.isStarted()) { + return HandlerResult.CONTINUE; + } + if (not.association() != null) { + this.maxOutboundStreams = not.association().maxOutboundStreams(); + this.maxInboundStreams = not.association().maxInboundStreams(); + } + + if (logger.isInfoEnabled()) { + logger.info(String.format( + "New association setup for Association=%s with %d outbound streams, and %d inbound streams, sctp assoc is %s.\n", + association.getName(), this.maxOutboundStreams, this.maxInboundStreams, not.association())); + } + + association.createworkerThreadTable(Math.max(this.maxInboundStreams, this.maxOutboundStreams)); + + try { + association.markAssociationUp(); + association.getAssociationListener().onCommunicationUp(association, this.maxInboundStreams, + this.maxOutboundStreams); + } catch (Exception e) { + logger.error( + String.format("Exception while calling onCommunicationUp on AssociationListener for Association=%s", + association.getName()), + e); + } + return HandlerResult.CONTINUE; + + case CANT_START: + logger.error(String.format("Can't start for Association=%s", association.getName())); + association.switchInitSocketAddress(); + association.scheduleConnect(); + return HandlerResult.CONTINUE; + case COMM_LOST: + logger.warn(String.format("Communication lost for Association=%s", association.getName())); + + // Close the Socket + association.close(); + association.scheduleConnect(); + try { + association.markAssociationDown(); + association.getAssociationListener().onCommunicationLost(association); + } catch (Exception e) { + logger.error(String.format( + "Exception while calling onCommunicationLost on AssociationListener for Association=%s", + association.getName()), e); + } + return HandlerResult.RETURN; + case RESTART: + logger.warn(String.format("Restart for Association=%s", association.getName())); + try { + association.getAssociationListener().onCommunicationRestart(association); + } catch (Exception e) { + logger.error(String.format( + "Exception while calling onCommunicationRestart on AssociationListener for Association=%s", + association.getName()), e); + } + return HandlerResult.CONTINUE; + case SHUTDOWN: + if (logger.isInfoEnabled()) { + logger.info(String.format("Shutdown for Association=%s", association.getName())); + } + try { + association.markAssociationDown(); + association.getAssociationListener().onCommunicationShutdown(association); + association.scheduleConnect(); + } catch (Exception e) { + logger.error(String.format( + "Exception while calling onCommunicationShutdown on AssociationListener for Association=%s", + association.getName()), e); + } + return HandlerResult.RETURN; + default: + logger.warn(String.format("Received unkown Event=%s for Association=%s", not.event(), association.getName())); + break; + } + + return HandlerResult.CONTINUE; + } + + @Override + public HandlerResult handleNotification(ShutdownNotification not, OneToManyAssociationImpl associtaion) { + if (logger.isInfoEnabled()) { + logger.info(String.format("Association=%s SHUTDOWN", associtaion.getName())); + } + + try { + associtaion.markAssociationDown(); + associtaion.getAssociationListener().onCommunicationShutdown(associtaion); + } catch (Exception e) { + logger.error( + String.format("Exception while calling onCommunicationShutdown on AssociationListener for Association=%s", + associtaion.getName()), + e); + } + + return HandlerResult.RETURN; + } + + @Override + public HandlerResult handleNotification(SendFailedNotification notification, OneToManyAssociationImpl associtaion) { + logger.error(String.format( + "Association=" + associtaion.getName() + " SendFailedNotification, errorCode=" + notification.errorCode())); + associtaion.onSendFailed(); + return HandlerResult.RETURN; + } + + @Override + public HandlerResult handleNotification(PeerAddressChangeNotification notification, OneToManyAssociationImpl associtaion) { + if (logger.isEnabledFor(Priority.INFO)) { + logger.info(String.format("Peer Address changed to=%s for Association=%s", notification.address(), + associtaion.getName())); + } + return HandlerResult.CONTINUE; + } + +} diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToManyAssociationImpl.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToManyAssociationImpl.java new file mode 100644 index 0000000..098d88f --- /dev/null +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToManyAssociationImpl.java @@ -0,0 +1,559 @@ +package org.mobicents.protocols.sctp.multiclient; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.spi.AbstractSelectableChannel; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +import javolution.util.FastList; +import javolution.xml.XMLFormat; +import javolution.xml.stream.XMLStreamException; + +import org.apache.log4j.Logger; +import org.mobicents.protocols.api.AssociationListener; +import org.mobicents.protocols.api.AssociationType; +import org.mobicents.protocols.api.IpChannelType; +import org.mobicents.protocols.api.ManagementEventListener; +import org.mobicents.protocols.api.PayloadData; + +import com.sun.nio.sctp.MessageInfo; + +/** + * Implements a one-to-many type ManagableAssociation. Used when associations is NOT peeled off the sctp multi channel sockets + * to a separate sctp socket channel. + * + * @author balogh.gabor@alerant.hu + */ +@SuppressWarnings("restriction") +public class OneToManyAssociationImpl extends ManageableAssociation { + + protected static final Logger logger = Logger.getLogger(OneToManyAssociationImpl.class); + + private static final String NAME = "name"; + private static final String SERVER_NAME = "serverName"; + private static final String HOST_ADDRESS = "hostAddress"; + private static final String HOST_PORT = "hostPort"; + + private static final String PEER_ADDRESS = "peerAddress"; + private static final String PEER_PORT = "peerPort"; + + private static final String ASSOCIATION_TYPE = "assoctype"; + private static final String IPCHANNEL_TYPE = "ipChannelType"; + private static final String EXTRA_HOST_ADDRESS = "extraHostAddress"; + private static final String EXTRA_HOST_ADDRESS_SIZE = "extraHostAddresseSize"; + + private AssociationListener associationListener = null; + + private ByteBuffer txBuffer = ByteBuffer.allocateDirect(8192); + + protected final OneToManyAssociationHandler associationHandler = new OneToManyAssociationHandler(); + + // Is the Association been started by management? + private AtomicBoolean started = new AtomicBoolean(false); + // Is the Association up (connection is established) + protected AtomicBoolean up = new AtomicBoolean(false); + + private int workerThreadTable[] = null; + + private volatile MessageInfo msgInfo; + + private volatile com.sun.nio.sctp.Association sctpAssociation; + private final IpChannelType ipChannelType = IpChannelType.SCTP; + + private OneToManyAssocMultiplexer multiplexer; + + /** + * Count of number of IO Errors occured. + */ + private volatile int ioErrors = 0; + + public OneToManyAssociationImpl() { + txBuffer.clear(); + txBuffer.rewind(); + txBuffer.flip(); + } + + /** + * Creating a CLIENT Association + * + * @param hostAddress + * @param hostPort + * @param peerAddress + * @param peerPort + * @param assocName + * @param ipChannelType + * @param extraHostAddresses + * @throws IOException + */ + public OneToManyAssociationImpl(String hostAddress, int hostPort, String peerAddress, int peerPort, String assocName, + String[] extraHostAddresses) throws IOException { + this(hostAddress, hostPort, peerAddress, peerPort, assocName, extraHostAddresses, null); + } + + public OneToManyAssociationImpl(String hostAddress, int hostPort, String peerAddress, int peerPort, String assocName, + String[] extraHostAddresses, String secondaryPeerAddress) throws IOException { + super(hostAddress, hostPort, peerAddress, peerPort, assocName, extraHostAddresses); + // clean transmission buffer + txBuffer.clear(); + txBuffer.rewind(); + txBuffer.flip(); + } + + public void start() throws Exception { + + if (this.associationListener == null) { + throw new NullPointerException(String.format("AssociationListener is null for Associatoion=%s", this.name)); + } + + if (started.getAndSet(true)) { + logger.warn("Association: " + this + " has been already STARTED"); + return; + } + for (ManagementEventListener lstr : this.management.getManagementEventListeners()) { + try { + lstr.onAssociationStarted(this); + } catch (Throwable ee) { + logger.error("Exception while invoking onAssociationStarted", ee); + } + } + scheduleConnect(); + } + + public void stop() throws Exception { + if (!started.getAndSet(false)) { + logger.warn("Association: " + this + " has been already STOPPED"); + return; + } + + for (ManagementEventListener lstr : this.management.getManagementEventListeners()) { + try { + lstr.onAssociationStopped(this); + } catch (Throwable ee) { + logger.error("Exception while invoking onAssociationStopped", ee); + } + } + + try { + this.associationListener.onCommunicationShutdown(this); + } catch (Exception e) { + logger.error(String.format( + "Exception while calling onCommunicationShutdown on AssociationListener for Association=%s", this.name), e); + } + + } + + public IpChannelType getIpChannelType() { + return IpChannelType.SCTP; + } + + /** + * @return the associationListener + */ + public AssociationListener getAssociationListener() { + return associationListener; + } + + /** + * @param associationListener the associationListener to set + */ + public void setAssociationListener(AssociationListener associationListener) { + this.associationListener = associationListener; + } + + /** + * @return the assocName + */ + public String getName() { + return name; + } + + /** + * @return the associationType + */ + public AssociationType getAssociationType() { + return AssociationType.CLIENT; + } + + /** + * @return the started + */ + @Override + public boolean isStarted() { + return started.get(); + } + + @Override + public boolean isConnected() { + return started.get() && up.get(); + } + + @Override + public boolean isUp() { + return up.get(); + } + + protected void markAssociationUp() { + if (up.getAndSet(true)) { + if (logger.isDebugEnabled()) { + logger.debug("Association: " + this + " has been already marked UP"); + } + return; + } + for (ManagementEventListener lstr : this.management.getManagementEventListeners()) { + try { + lstr.onAssociationUp(this); + } catch (Throwable ee) { + logger.error("Exception while invoking onAssociationUp", ee); + } + } + } + + protected void markAssociationDown() { + if (!up.getAndSet(false)) { + if (logger.isDebugEnabled()) { + logger.debug("Association: " + this + " has been already marked DOWN"); + } + return; + } + for (ManagementEventListener lstr : this.management.getManagementEventListeners()) { + try { + lstr.onAssociationDown(this); + } catch (Throwable ee) { + logger.error("Exception while invoking onAssociationDown", ee); + } + } + } + + /** + * @return the hostAddress + */ + public String getHostAddress() { + return hostAddress; + } + + /** + * @return the hostPort + */ + public int getHostPort() { + return hostPort; + } + + /** + * @return the peerAddress + */ + public String getPeerAddress() { + return peerAddress; + } + + /** + * @return the peerPort + */ + public int getPeerPort() { + return peerPort; + } + + /** + * @return the serverName + */ + public String getServerName() { + return null; + } + + @Override + public String[] getExtraHostAddresses() { + return extraHostAddresses; + } + + /** + * @param management the management to set + */ + public void setManagement(MultiManagementImpl management) { + this.management = management; + } + + /** + * @param socketChannel the socketChannel to set + */ + protected void setSocketChannel(AbstractSelectableChannel socketChannel) { + // + } + + protected void readPayload(PayloadData payload) { + if (payload == null) { + return; + } + + if (logger.isDebugEnabled()) { + logger.debug(String.format("Rx : Ass=%s %s", this.name, payload)); + } + + if (this.management.isSingleThread()) { + try { + this.associationListener.onPayload(this, payload); + } catch (Exception e) { + logger.error(String.format("Error while calling Listener for Association=%s.Payload=%s", this.name, payload), + e); + } + } else { + MultiWorker worker = new MultiWorker(this, this.associationListener, payload); + + ExecutorService executorService = this.management + .getExecutorService(this.workerThreadTable[payload.getStreamNumber()]); + try { + executorService.execute(worker); + } catch (RejectedExecutionException e) { + logger.error(String.format("Rejected %s as Executors is shutdown", payload), e); + } catch (NullPointerException e) { + logger.error(String.format("NullPointerException while submitting %s", payload), e); + } catch (Exception e) { + logger.error(String.format("Exception while submitting %s", payload), e); + } + } + } + + public void send(PayloadData payloadData) throws Exception { + if (!started.get()) { + throw new Exception("send failed: Association is not started"); + } + multiplexer.send(payloadData, this.msgInfo, this); + } + + protected boolean writePayload(PayloadData payloadData, boolean initMsg) { + try { + + if (txBuffer.hasRemaining()) { + // All data wasn't sent in last doWrite. Try to send it now + this.doSend(); + } + // TODO Do we need to synchronize ConcurrentLinkedQueue? + // synchronized (this.txQueue) { + if (!txBuffer.hasRemaining()) { + txBuffer.clear(); + if (logger.isDebugEnabled()) { + logger.debug(String.format("Tx : Ass=%s %s", this.name, payloadData)); + } + + // load ByteBuffer + // TODO: BufferOverflowException ? + txBuffer.put(payloadData.getData()); + + int seqControl = payloadData.getStreamNumber(); + + if (seqControl < 0 || seqControl >= this.associationHandler.getMaxOutboundStreams()) { + try { + // TODO : calling in same Thread. Is this ok? or + // dangerous? + this.associationListener.inValidStreamId(payloadData); + } catch (Exception e) { + logger.warn(e); + } + txBuffer.clear(); + txBuffer.flip(); + return false; + } + + if (initMsg) { + if (this.sctpAssociation != null) { + msgInfo = MessageInfo.createOutgoing(sctpAssociation, initSocketAddress, seqControl); + } else { + msgInfo = MessageInfo.createOutgoing(this.initSocketAddress, seqControl); + } + + } else { + if (this.sctpAssociation != null) { + msgInfo = MessageInfo.createOutgoing(sctpAssociation, peerSocketAddress, seqControl); + } else { + msgInfo = MessageInfo.createOutgoing(this.peerSocketAddress, seqControl); + } + } + msgInfo.payloadProtocolID(payloadData.getPayloadProtocolId()); + msgInfo.complete(payloadData.isComplete()); + msgInfo.unordered(payloadData.isUnordered()); + + logger.debug("write() - msgInfo: " + msgInfo); + txBuffer.flip(); + + this.doSend(); + + if (txBuffer.hasRemaining()) { + // Couldn't send all data. Lets return now and try to + // send + // this message in next cycle + return true; + } + return true; + } + return false; + } catch (IOException e) { + this.ioErrors++; + logger.error( + String.format("IOException while trying to write to underlying socket for Association=%s IOError count=%d", + this.name, this.ioErrors), + e); + logger.error("Internal send failed, retrying."); + this.close(); + onSendFailed(); + return false; + } catch (Exception ex) { + logger.error(String.format( + "Unexpected exception has been caught while trying to write SCTP socketChanel for Association=%s: %s", + this.name, ex.getMessage()), ex); + return false; + } + } + + private int doSend() throws IOException { + return multiplexer.getSocketMultiChannel().send(txBuffer, msgInfo); + } + + protected void reconnect() { + try { + doInitiateConnectionSctp(); + } catch (Exception ex) { + logger.warn("Error while trying to reconnect association[" + this.getName() + "]: " + ex.getMessage(), ex); + scheduleConnect(); + } + } + + protected void close() { + if (multiplexer != null) { + multiplexer.unregisterAssociation(this); + if (logger.isDebugEnabled()) { + logger.debug("close() - association=" + getName() + " is unregistered from the multiplexer"); + } + } + try { + this.markAssociationDown(); + this.associationListener.onCommunicationShutdown(this); + } catch (Exception e) { + logger.error(String.format( + "Exception while calling onCommunicationShutdown on AssociationListener for Association=%s", this.name), e); + } + } + + protected AbstractSelectableChannel getSocketChannel() { + if (this.multiplexer == null) { + return null; + } + return this.multiplexer.getSocketMultiChannel(); + } + + protected void scheduleConnect() { + if (!started.get()) { + logger.info("Association " + name + " is not started, no need to reconnect"); + return; + } + if (up.get()) { + logger.info("Associoation " + name + " is up, no need to reconnect"); + } else { + FastList pendingChanges = this.management.getPendingChanges(); + synchronized (pendingChanges) { + pendingChanges.add(new MultiChangeRequest(null, this, MultiChangeRequest.CONNECT, + System.currentTimeMillis() + this.management.getConnectDelay())); + } + } + } + + private void doInitiateConnectionSctp() throws IOException { + this.multiplexer = management.getMultiChannelController().register(this); + this.multiplexer.send(getInitPayloadData(), null, this, true); + } + + protected void createworkerThreadTable(int maximumBooundStream) { + this.workerThreadTable = new int[maximumBooundStream]; + this.management.populateWorkerThread(this.workerThreadTable); + } + + @Override + public String toString() { + return "OneToManyAssociationImpl [hostAddress=" + hostAddress + ", hostPort=" + hostPort + ", peerAddress=" + + peerAddress + ", peerPort=" + peerPort + ", name=" + name + ", extraHostAddresses=" + + Arrays.toString(extraHostAddresses) + ", secondaryPeerAddress=" + this.secondaryPeerAddress + ", type=" + + AssociationType.CLIENT + ", started=" + started + ", up=" + up + ", management=" + management + ", msgInfo=" + + msgInfo + ", sctpAssociation=" + sctpAssociation + ", ipChannelType=" + ipChannelType + ", assocInfo=" + + assocInfo + ", multiplexer=" + multiplexer + ", ioErrors=" + ioErrors + "]"; + } + + /** + * XML Serialization/Deserialization + */ + protected static final XMLFormat ASSOCIATION_XML = new XMLFormat( + OneToManyAssociationImpl.class) { + + @Override + public void read(javolution.xml.XMLFormat.InputElement xml, OneToManyAssociationImpl association) + throws XMLStreamException { + association.name = xml.getAttribute(NAME, ""); + + association.hostAddress = xml.getAttribute(HOST_ADDRESS, ""); + association.hostPort = xml.getAttribute(HOST_PORT, 0); + + association.peerAddress = xml.getAttribute(PEER_ADDRESS, ""); + association.peerPort = xml.getAttribute(PEER_PORT, 0); + + // association.serverName = xml.getAttribute(SERVER_NAME, ""); + + int extraHostAddressesSize = xml.getAttribute(EXTRA_HOST_ADDRESS_SIZE, 0); + association.extraHostAddresses = new String[extraHostAddressesSize]; + + for (int i = 0; i < extraHostAddressesSize; i++) { + association.extraHostAddresses[i] = xml.get(EXTRA_HOST_ADDRESS, String.class); + } + try { + association.initDerivedFields(); + } catch (IOException e) { + logger.error("Unable to load association from XML: error while calculating derived fields", e); + } + } + + @Override + public void write(OneToManyAssociationImpl association, javolution.xml.XMLFormat.OutputElement xml) + throws XMLStreamException { + xml.setAttribute(NAME, association.name); + xml.setAttribute(ASSOCIATION_TYPE, AssociationType.CLIENT.getType()); + xml.setAttribute(HOST_ADDRESS, association.hostAddress); + xml.setAttribute(HOST_PORT, association.hostPort); + + xml.setAttribute(PEER_ADDRESS, association.peerAddress); + xml.setAttribute(PEER_PORT, association.peerPort); + + xml.setAttribute(SERVER_NAME, null); + xml.setAttribute(IPCHANNEL_TYPE, association.ipChannelType.getCode()); + + xml.setAttribute(EXTRA_HOST_ADDRESS_SIZE, + association.extraHostAddresses != null ? association.extraHostAddresses.length : 0); + if (association.extraHostAddresses != null) { + for (String s : association.extraHostAddresses) { + xml.add(s, EXTRA_HOST_ADDRESS, String.class); + } + } + } + }; + + protected void onSendFailed() { + // if started and down then it means it is a CANT_START event and scheduleConnect must be called. + if (started.get() && !up.get()) { + logger.warn("Association=" + getName() + " CANT_START, trying to reconnect..."); + switchInitSocketAddress(); + scheduleConnect(); + } + } + + @Override + public void acceptAnonymousAssociation(AssociationListener associationListener) throws Exception { + throw new UnsupportedOperationException(this.getClass() + " class does not implement SERVER type Associations!"); + } + + @Override + public void rejectAnonymousAssociation() { + throw new UnsupportedOperationException(this.getClass() + " class does not implement SERVER type Associations!"); + } + + @Override + public void stopAnonymousAssociation() throws Exception { + throw new UnsupportedOperationException(this.getClass() + " class does not implement SERVER type Associations!"); + } +} diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToOneAssociationHandler.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToOneAssociationHandler.java new file mode 100644 index 0000000..645a9f1 --- /dev/null +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToOneAssociationHandler.java @@ -0,0 +1,186 @@ +package org.mobicents.protocols.sctp.multiclient; + +import org.apache.log4j.Logger; +import org.apache.log4j.Priority; + +import com.sun.nio.sctp.AbstractNotificationHandler; +import com.sun.nio.sctp.AssociationChangeNotification; +import com.sun.nio.sctp.HandlerResult; +import com.sun.nio.sctp.Notification; +import com.sun.nio.sctp.PeerAddressChangeNotification; +import com.sun.nio.sctp.SendFailedNotification; +import com.sun.nio.sctp.ShutdownNotification; + +/** + * Handles notifications for OneToOneAssociationImpl objects. + * + * @author balogh.gabor@alerant.hu + * + */ +@SuppressWarnings("restriction") +public class OneToOneAssociationHandler extends AbstractNotificationHandler { + + private static final Logger logger = Logger.getLogger(OneToOneAssociationHandler.class); + + private volatile int maxInboundStreams = 1; + private volatile int maxOutboundStreams = 1; + + public OneToOneAssociationHandler() { + + } + + /** + * @return the maxInboundStreams + */ + public int getMaxInboundStreams() { + return maxInboundStreams; + } + + /** + * @return the maxOutboundStreams + */ + public int getMaxOutboundStreams() { + return maxOutboundStreams; + } + + @Override + public HandlerResult handleNotification(Notification arg0, OneToOneAssociationImpl arg1) { + if (arg0 instanceof AssociationChangeNotification) { + return handleNotification((AssociationChangeNotification) arg0, arg1); + } + if (arg0 instanceof ShutdownNotification) { + return handleNotification((ShutdownNotification) arg0, arg1); + } + if (arg0 instanceof SendFailedNotification) { + return handleNotification((SendFailedNotification) arg0, arg1); + } + if (arg0 instanceof PeerAddressChangeNotification) { + return handleNotification((PeerAddressChangeNotification) arg0, arg1); + } + return super.handleNotification(arg0, arg1); + } + + @Override + public HandlerResult handleNotification(AssociationChangeNotification not, OneToOneAssociationImpl association) { + + switch (not.event()) { + case COMM_UP: + // in case when comm is go online but the association has been already stopped COMM_UP event is sinked. + if (!association.isStarted()) { + association.silentlyShutdown(); + return HandlerResult.CONTINUE; + } + if (not.association() != null) { + this.maxOutboundStreams = not.association().maxOutboundStreams(); + this.maxInboundStreams = not.association().maxInboundStreams(); + } + + if (logger.isInfoEnabled()) { + logger.info(String.format( + "New association setup for Association=%s with %d outbound streams, and %d inbound streams, sctp assoc is %s.\n", + association.getName(), this.maxOutboundStreams, this.maxInboundStreams, not.association())); + } + + association.createworkerThreadTable(Math.max(this.maxInboundStreams, this.maxOutboundStreams)); + + try { + association.markAssociationUp(); + association.getAssociationListener().onCommunicationUp(association, this.maxInboundStreams, + this.maxOutboundStreams); + } catch (Exception e) { + logger.error( + String.format("Exception while calling onCommunicationUp on AssociationListener for Association=%s", + association.getName()), + e); + } + return HandlerResult.CONTINUE; + case CANT_START: + logger.error(String.format("Can't start for Association=%s", association.getName())); + // Close the Socket + association.close(); + association.switchInitSocketAddress(); + association.scheduleConnect(); + return HandlerResult.CONTINUE; + case COMM_LOST: + logger.warn(String.format("Communication lost for Association=%s", association.getName())); + + // Close the Socket + association.close(); + + association.scheduleConnect(); + try { + association.markAssociationDown(); + association.getAssociationListener().onCommunicationLost(association); + } catch (Exception e) { + logger.error(String.format( + "Exception while calling onCommunicationLost on AssociationListener for Association=%s", + association.getName()), e); + } + return HandlerResult.RETURN; + case RESTART: + logger.warn(String.format("Restart for Association=%s", association.getName())); + try { + association.getAssociationListener().onCommunicationRestart(association); + } catch (Exception e) { + logger.error(String.format( + "Exception while calling onCommunicationRestart on AssociationListener for Association=%s", + association.getName()), e); + } + return HandlerResult.CONTINUE; + case SHUTDOWN: + if (logger.isInfoEnabled()) { + logger.info(String.format("Shutdown for Association=%s", association.getName())); + } + try { + association.markAssociationDown(); + association.getAssociationListener().onCommunicationShutdown(association); + association.scheduleConnect(); + } catch (Exception e) { + logger.error(String.format( + "Exception while calling onCommunicationShutdown on AssociationListener for Association=%s", + association.getName()), e); + } + return HandlerResult.RETURN; + default: + logger.warn(String.format("Received unkown Event=%s for Association=%s", not.event(), association.getName())); + break; + } + return HandlerResult.CONTINUE; + } + + @Override + public HandlerResult handleNotification(ShutdownNotification not, OneToOneAssociationImpl associtaion) { + if (logger.isInfoEnabled()) { + logger.info(String.format("Association=%s SHUTDOWN", associtaion.getName())); + } + + try { + associtaion.markAssociationDown(); + associtaion.getAssociationListener().onCommunicationShutdown(associtaion); + } catch (Exception e) { + logger.error( + String.format("Exception while calling onCommunicationShutdown on AssociationListener for Association=%s", + associtaion.getName()), + e); + } + + return HandlerResult.RETURN; + } + + @Override + public HandlerResult handleNotification(SendFailedNotification notification, OneToOneAssociationImpl associtaion) { + logger.error(String.format( + "Association=" + associtaion.getName() + " SendFailedNotification, errorCode=" + notification.errorCode())); + associtaion.onSendFailed(); + return HandlerResult.RETURN; + } + + @Override + public HandlerResult handleNotification(PeerAddressChangeNotification notification, OneToOneAssociationImpl associtaion) { + if (logger.isEnabledFor(Priority.INFO)) { + logger.info(String.format("Peer Address changed to=%s for Association=%s", notification.address(), + associtaion.getName())); + } + return HandlerResult.CONTINUE; + } +} diff --git a/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToOneAssociationImpl.java b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToOneAssociationImpl.java new file mode 100644 index 0000000..c8ffece --- /dev/null +++ b/sctp-impl/src/main/java/org/mobicents/protocols/sctp/multiclient/OneToOneAssociationImpl.java @@ -0,0 +1,859 @@ +package org.mobicents.protocols.sctp.multiclient; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.spi.AbstractSelectableChannel; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +import javolution.util.FastList; +import javolution.xml.XMLFormat; +import javolution.xml.stream.XMLStreamException; + +import org.apache.log4j.Logger; +import org.mobicents.protocols.api.AssociationListener; +import org.mobicents.protocols.api.AssociationType; +import org.mobicents.protocols.api.IpChannelType; +import org.mobicents.protocols.api.ManagementEventListener; +import org.mobicents.protocols.api.PayloadData; + +import com.sun.nio.sctp.MessageInfo; +import com.sun.nio.sctp.SctpChannel; + +/** + * Implements a one-to-one type ManagableAssociation. Used when associations is peeled off the sctp multi channel sockets to a + * separate sctp socket channel. + * + * @author amit bhayani + * @author balogh.gabor@alerant.hu + * + */ +@SuppressWarnings("restriction") +public class OneToOneAssociationImpl extends ManageableAssociation { + + protected static final Logger logger = Logger.getLogger(OneToOneAssociationImpl.class.getName()); + + private static final String NAME = "name"; + private static final String SERVER_NAME = "serverName"; + private static final String HOST_ADDRESS = "hostAddress"; + private static final String HOST_PORT = "hostPort"; + + private static final String PEER_ADDRESS = "peerAddress"; + private static final String PEER_PORT = "peerPort"; + + private static final String ASSOCIATION_TYPE = "assoctype"; + private static final String IPCHANNEL_TYPE = "ipChannelType"; + private static final String EXTRA_HOST_ADDRESS = "extraHostAddress"; + private static final String EXTRA_HOST_ADDRESS_SIZE = "extraHostAddresseSize"; + + private AssociationType type; + + private AssociationListener associationListener = null; + + protected final OneToOneAssociationHandler associationHandler = new OneToOneAssociationHandler(); + + // Is the Association been started by management? + private AtomicBoolean started = new AtomicBoolean(false); + // Is the Association up (connection is established) + protected AtomicBoolean up = new AtomicBoolean(false); + + private int workerThreadTable[] = null; + + private ConcurrentLinkedQueue txQueue = new ConcurrentLinkedQueue(); + + private SctpChannel socketChannelSctp; + + // The buffer into which we'll read data when it's available + private ByteBuffer rxBuffer = ByteBuffer.allocateDirect(8192); + private ByteBuffer txBuffer = ByteBuffer.allocateDirect(8192); + + private volatile MessageInfo msgInfo; + + private OneToManyAssocMultiplexer multiplexer; + + /** + * Count of number of IO Errors occured. If this exceeds the maxIOErrors set in Management, socket will be closed and + * request to reopen the cosket will be initiated + */ + private volatile int ioErrors = 0; + + public OneToOneAssociationImpl() { + this.type = AssociationType.CLIENT; + // clean transmission buffer + txBuffer.clear(); + txBuffer.rewind(); + txBuffer.flip(); + + // clean receiver buffer + rxBuffer.clear(); + rxBuffer.rewind(); + rxBuffer.flip(); + } + + /** + * Creating a CLIENT Association + * + * @param hostAddress + * @param hostPort + * @param peerAddress + * @param peerPort + * @param assocName + * @param ipChannelType + * @param extraHostAddresses + * @throws IOException + */ + public OneToOneAssociationImpl(String hostAddress, int hostPort, String peerAddress, int peerPort, String assocName, + String[] extraHostAddresses) throws IOException { + this(hostAddress, hostPort, peerAddress, peerPort, assocName, extraHostAddresses, null); + } + + public OneToOneAssociationImpl(String hostAddress, int hostPort, String peerAddress, int peerPort, String assocName, + String[] extraHostAddresses, String secondaryPeerAddress) throws IOException { + super(hostAddress, hostPort, peerAddress, peerPort, assocName, extraHostAddresses, secondaryPeerAddress); + this.type = AssociationType.CLIENT; + // clean transmission buffer + txBuffer.clear(); + txBuffer.rewind(); + txBuffer.flip(); + + // clean receiver buffer + rxBuffer.clear(); + rxBuffer.rewind(); + rxBuffer.flip(); + } + + protected void start() throws Exception { + + if (this.associationListener == null) { + throw new NullPointerException(String.format("AssociationListener is null for Associatoion=%s", this.name)); + } + + if (started.getAndSet(true)) { + logger.warn("Association: " + this + " has been already STARTED"); + return; + } + + scheduleConnect(); + + for (ManagementEventListener lstr : this.management.getManagementEventListeners()) { + try { + lstr.onAssociationStarted(this); + } catch (Throwable ee) { + logger.error("Exception while invoking onAssociationStarted", ee); + } + } + } + + /** + * Stops this Association. If the underlying SctpChannel is open, marks the channel for close + */ + protected void stop() throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("stopped called on association=" + this); + } + if (!started.getAndSet(false)) { + logger.warn("Association: " + this + " has been already STOPPED"); + return; + } + for (ManagementEventListener lstr : this.management.getManagementEventListeners()) { + try { + lstr.onAssociationStopped(this); + } catch (Throwable ee) { + logger.error("Exception while invoking onAssociationStopped", ee); + } + } + + if (this.getSocketChannel() != null && this.getSocketChannel().isOpen()) { + FastList pendingChanges = this.management.getPendingChanges(); + synchronized (pendingChanges) { + // Indicate we want the interest ops set changed + pendingChanges.add(new MultiChangeRequest(getSocketChannel(), null, this, MultiChangeRequest.CLOSE, -1)); + } + + // Finally, wake up our selecting thread so it can make the required + // changes + this.management.getSocketSelector().wakeup(); + } else if (multiplexer != null) { + multiplexer.unregisterAssociation(this); + if (logger.isDebugEnabled()) { + logger.debug("close() - association=" + getName() + " is unregistered from the multiplexer"); + } + } + } + public IpChannelType getIpChannelType() { + return IpChannelType.SCTP; + } + + /** + * @return the associationListener + */ + public AssociationListener getAssociationListener() { + return associationListener; + } + + /** + * @param associationListener the associationListener to set + */ + public void setAssociationListener(AssociationListener associationListener) { + this.associationListener = associationListener; + } + + /** + * @return the assocName + */ + public String getName() { + return name; + } + + /** + * @return the associationType + */ + public AssociationType getAssociationType() { + return AssociationType.CLIENT; + } + + /** + * @return the started + */ + @Override + public boolean isStarted() { + return started.get(); + } + + @Override + public boolean isConnected() { + return started.get() && up.get(); + } + + @Override + public boolean isUp() { + return up.get(); + } + + protected void markAssociationUp() { + if (up.getAndSet(true)) { + logger.debug("Association: " + this + " has been already marked UP"); + return; + } + + for (ManagementEventListener lstr : this.management.getManagementEventListeners()) { + try { + lstr.onAssociationUp(this); + } catch (Throwable ee) { + logger.error("Exception while invoking onAssociationUp", ee); + } + } + } + + protected void markAssociationDown() { + if (!up.getAndSet(false)) { + logger.debug("Association: " + this + " has been already marked DOWN"); + return; + } + for (ManagementEventListener lstr : this.management.getManagementEventListeners()) { + try { + lstr.onAssociationDown(this); + } catch (Throwable ee) { + logger.error("Exception while invoking onAssociationDown", ee); + } + } + } + + /** + * @return the hostAddress + */ + public String getHostAddress() { + return hostAddress; + } + + /** + * @return the hostPort + */ + public int getHostPort() { + return hostPort; + } + + /** + * @return the peerAddress + */ + public String getPeerAddress() { + return peerAddress; + } + + /** + * @return the peerPort + */ + public int getPeerPort() { + return peerPort; + } + + /** + * @return the serverName + */ + public String getServerName() { + return null; + } + + @Override + public String[] getExtraHostAddresses() { + return extraHostAddresses; + } + + /** + * @param management the management to set + */ + protected void setManagement(MultiManagementImpl management) { + this.management = management; + } + + protected AbstractSelectableChannel getSocketChannel() { + return this.socketChannelSctp; + } + + protected void reconnect() { + try { + doInitiateConnectionSctp(); + } catch (Exception ex) { + logger.warn("Error while trying to reconnect association[" + this.getName() + "]: " + ex.getMessage(), ex); + scheduleConnect(); + } + } + + /** + * @param socketChannel the socketChannel to set + */ + protected void setSocketChannel(AbstractSelectableChannel socketChannel) { + this.socketChannelSctp = (SctpChannel) socketChannel; + } + + public void send(PayloadData payloadData) throws Exception { + try { + this.checkSocketIsOpen(); + + FastList pendingChanges = this.management.getPendingChanges(); + synchronized (pendingChanges) { + // Indicate we want the interest ops set changed + pendingChanges.add(new MultiChangeRequest(this.getSocketChannel(), null, this, MultiChangeRequest.CHANGEOPS, + SelectionKey.OP_WRITE)); + // And queue the data we want written + // TODO Do we need to synchronize ConcurrentLinkedQueue ? + // synchronized (this.txQueue) { + this.txQueue.add(payloadData); + } + // Finally, wake up our selecting thread so it can make the required + // changes + this.management.getSocketSelector().wakeup(); + } catch (Exception ex) { + logger.error("Error while sending payload data: " + ex.getMessage(), ex); + } + } + + private void checkSocketIsOpen() throws Exception { + if (!started.get() || this.socketChannelSctp == null || !this.socketChannelSctp.isOpen() + || this.socketChannelSctp.association() == null) { + logger.warn(String.format("Underlying sctp channel doesn't open or doesn't have association for Association=%s", + this.name)); + throw new Exception(String + .format("Underlying sctp channel doesn't open or doesn't have association for Association=%s", this.name)); + } + } + + protected void read() { + try { + PayloadData payload; + + payload = this.doReadSctp(); + + if (payload == null) + return; + + if (logger.isDebugEnabled()) { + logger.debug(String.format("Rx : Ass=%s %s", this.name, payload)); + } + + if (this.management.isSingleThread()) { + // If single thread model the listener should be called in the + // selector thread itself + try { + this.associationListener.onPayload(this, payload); + } catch (Exception e) { + logger.error( + String.format("Error while calling Listener for Association=%s.Payload=%s", this.name, payload), e); + } + } else { + + MultiWorker worker = new MultiWorker(this, this.associationListener, payload); + + ExecutorService executorService = this.management + .getExecutorService(this.workerThreadTable[payload.getStreamNumber()]); + try { + executorService.execute(worker); + } catch (RejectedExecutionException e) { + logger.error(String.format("Rejected %s as Executors is shutdown", payload), e); + } catch (NullPointerException e) { + logger.error(String.format("NullPointerException while submitting %s", payload), e); + } catch (Exception e) { + logger.error(String.format("Exception while submitting %s", payload), e); + } + } + } catch (IOException e) { + this.ioErrors++; + logger.error( + String.format("IOException while trying to read from underlying socket for Association=%s IOError count=%d", + this.name, this.ioErrors), + e); + + if (this.ioErrors > this.management.getMaxIOErrors()) { + // Close this socket + this.close(); + + // retry to connect after delay + this.scheduleConnect(); + } + } + } + + private PayloadData doReadSctp() throws IOException { + rxBuffer.clear(); + MessageInfo messageInfo = this.socketChannelSctp.receive(rxBuffer, this, this.associationHandler); + + if (messageInfo == null) { + if (logger.isDebugEnabled()) { + logger.debug(String.format(" messageInfo is null for Association=%s", this.name)); + } + return null; + } + + int len = messageInfo.bytes(); + if (len == -1) { + logger.error(String.format("Rx -1 while trying to read from underlying socket for Association=%s ", this.name)); + this.close(); + this.scheduleConnect(); + return null; + } + + rxBuffer.flip(); + byte[] data = new byte[len]; + rxBuffer.get(data); + rxBuffer.clear(); + + PayloadData payload = new PayloadData(len, data, messageInfo.isComplete(), messageInfo.isUnordered(), + messageInfo.payloadProtocolID(), messageInfo.streamNumber()); + + return payload; + } + + protected void write(SelectionKey key) { + try { + + if (txBuffer.hasRemaining()) { + // All data wasn't sent in last doWrite. Try to send it now + this.doSend(); + } + + // TODO Do we need to synchronize ConcurrentLinkedQueue? + // synchronized (this.txQueue) { + if (!txQueue.isEmpty() && !txBuffer.hasRemaining()) { + while (!txQueue.isEmpty()) { + // Lets read all the messages in txQueue and send + + txBuffer.clear(); + PayloadData payloadData = txQueue.poll(); + + if (logger.isDebugEnabled()) { + logger.debug(String.format("Tx : Ass=%s %s", this.name, payloadData)); + } + + // load ByteBuffer + // TODO: BufferOverflowException ? + txBuffer.put(payloadData.getData()); + + int seqControl = payloadData.getStreamNumber(); + + if (seqControl < 0 || seqControl >= this.associationHandler.getMaxOutboundStreams()) { + try { + // TODO : calling in same Thread. Is this ok? or + // dangerous? + this.associationListener.inValidStreamId(payloadData); + } catch (Exception e) { + + } + txBuffer.clear(); + txBuffer.flip(); + continue; + } + + msgInfo = MessageInfo.createOutgoing(this.peerSocketAddress, seqControl); + msgInfo.payloadProtocolID(payloadData.getPayloadProtocolId()); + msgInfo.complete(payloadData.isComplete()); + msgInfo.unordered(payloadData.isUnordered()); + + txBuffer.flip(); + + this.doSend(); + + if (txBuffer.hasRemaining()) { + // Couldn't send all data. Lets return now and try to + // send + // this message in next cycle + return; + } + + } // end of while + } + + if (txQueue.isEmpty()) { + // We wrote away all data, so we're no longer interested + // in writing on this socket. Switch back to waiting for + // data. + key.interestOps(SelectionKey.OP_READ); + } + + } catch (IOException e) { + this.ioErrors++; + logger.error( + String.format("IOException while trying to write to underlying socket for Association=%s IOError count=%d", + this.name, this.ioErrors), + e); + + if (this.ioErrors > this.management.getMaxIOErrors()) { + // Close this socket + this.close(); + + // retry to connect after delay + this.scheduleConnect(); + } + } // try-catch + } + + private int doSend() throws IOException { + return this.socketChannelSctp.send(txBuffer, msgInfo); + } + + protected boolean isOpen() { + return this.getSocketChannel() != null && this.getSocketChannel().isOpen(); + } + + protected void close() { + if (this.getSocketChannel() != null) { + try { + this.getSocketChannel().close(); + if (logger.isDebugEnabled()) { + logger.debug("close() - socketChannel is closed for association=" + getName()); + } + } catch (Exception e) { + logger.error(String.format("Exception while closing the SctpScoket for Association=%s", this.name), e); + } + } + if (this.up.get()) { + try { + this.markAssociationDown(); + this.associationListener.onCommunicationShutdown(this); + } catch (Exception e) { + logger.error(String.format( + "Exception while calling onCommunicationShutdown on AssociationListener for Association=%s", this.name), + e); + } + } + if (multiplexer != null) { + multiplexer.unregisterAssociation(this); + if (logger.isDebugEnabled()) { + logger.debug("close() - association=" + getName() + " is unregistered from the multiplexer"); + } + } + // Finally clear the txQueue + if (this.txQueue.size() > 0) { + logger.warn(String.format("Clearig txQueue for Association=%s. %d messages still pending will be cleared", + this.name, this.txQueue.size())); + } + this.txQueue.clear(); + } + + protected void scheduleConnect() { + if (this.getAssociationType() == AssociationType.CLIENT) { + FastList pendingChanges = this.management.getPendingChanges(); + synchronized (pendingChanges) { + pendingChanges.add(new MultiChangeRequest(null, this, MultiChangeRequest.CONNECT, + System.currentTimeMillis() + this.management.getConnectDelay())); + } + } + } + + protected void branch(SctpChannel sctpChannel, MultiManagementImpl management) { + // if association is stopped, channel wont be registered. + if (!started.get()) { + if (logger.isInfoEnabled()) { + logger.info("Branching a stopped association, channel wont be registered to the selector."); + } + // set channel to able to close later + this.socketChannelSctp = sctpChannel; + this.management = management; + } else { + FastList pendingChanges = this.management.getPendingChanges(); + synchronized (pendingChanges) { + // setting the channel must be synchronized + this.socketChannelSctp = sctpChannel; + this.management = management; + pendingChanges.add(new MultiChangeRequest(sctpChannel, null, this, MultiChangeRequest.REGISTER, + SelectionKey.OP_WRITE | SelectionKey.OP_READ)); + } + } + ; + } + + private void doInitiateConnectionSctp() throws IOException { + // reset the ioErrors + this.ioErrors = 0; + this.multiplexer = management.getMultiChannelController().register(this); + this.multiplexer.send(getInitPayloadData(), null, this, true); + } + + protected void createworkerThreadTable(int maximumBooundStream) { + this.workerThreadTable = new int[maximumBooundStream]; + this.management.populateWorkerThread(this.workerThreadTable); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + + StringBuilder sb = new StringBuilder(); + sb.append("Association [name=").append(this.name).append(", started=").append(started.get()).append(", up=").append(up) + .append(", associationType=").append(this.type).append(", ipChannelType=").append("SCTP") + .append(", hostAddress=").append(this.hostAddress).append(", hostPort=").append(this.hostPort) + .append(", peerAddress=").append(this.peerAddress).append(", peerPort=").append(this.peerPort) + .append(", serverName=").append(""); + + sb.append(", extraHostAddress=["); + + if (this.extraHostAddresses != null) { + for (int i = 0; i < this.extraHostAddresses.length; i++) { + String extraHostAddress = this.extraHostAddresses[i]; + sb.append(extraHostAddress); + sb.append(", "); + } + } + sb.append(" secondaryPeerAddress=").append(this.secondaryPeerAddress); + sb.append("]]"); + + return sb.toString(); + } + + /** + * XML Serialization/Deserialization + */ + protected static final XMLFormat ASSOCIATION_XML = new XMLFormat( + OneToOneAssociationImpl.class) { + + @Override + public void read(javolution.xml.XMLFormat.InputElement xml, OneToOneAssociationImpl association) + throws XMLStreamException { + association.name = xml.getAttribute(NAME, ""); + association.hostAddress = xml.getAttribute(HOST_ADDRESS, ""); + association.hostPort = xml.getAttribute(HOST_PORT, 0); + + association.peerAddress = xml.getAttribute(PEER_ADDRESS, ""); + association.peerPort = xml.getAttribute(PEER_PORT, 0); + + int extraHostAddressesSize = xml.getAttribute(EXTRA_HOST_ADDRESS_SIZE, 0); + association.extraHostAddresses = new String[extraHostAddressesSize]; + + for (int i = 0; i < extraHostAddressesSize; i++) { + association.extraHostAddresses[i] = xml.get(EXTRA_HOST_ADDRESS, String.class); + } + try { + association.initDerivedFields(); + } catch (IOException e) { + logger.error("Unable to load association from XML: error while calculating derived fields", e); + } + } + + @Override + public void write(OneToOneAssociationImpl association, javolution.xml.XMLFormat.OutputElement xml) + throws XMLStreamException { + xml.setAttribute(NAME, association.name); + xml.setAttribute(ASSOCIATION_TYPE, association.type.getType()); + xml.setAttribute(HOST_ADDRESS, association.hostAddress); + xml.setAttribute(HOST_PORT, association.hostPort); + + xml.setAttribute(PEER_ADDRESS, association.peerAddress); + xml.setAttribute(PEER_PORT, association.peerPort); + + xml.setAttribute(SERVER_NAME, ""); + xml.setAttribute(IPCHANNEL_TYPE, IpChannelType.SCTP); + + xml.setAttribute(EXTRA_HOST_ADDRESS_SIZE, + association.extraHostAddresses != null ? association.extraHostAddresses.length : 0); + if (association.extraHostAddresses != null) { + for (String s : association.extraHostAddresses) { + xml.add(s, EXTRA_HOST_ADDRESS, String.class); + } + } + } + }; + + @Override + protected void readPayload(PayloadData payload) { + if (payload == null) { + return; + } + + if (logger.isDebugEnabled()) { + logger.debug(String.format("Rx : Ass=%s %s", this.name, payload)); + } + + if (this.management.isSingleThread()) { + try { + this.associationListener.onPayload(this, payload); + } catch (Exception e) { + logger.error(String.format("Error while calling Listener for Association=%s.Payload=%s", this.name, payload), + e); + } + } else { + MultiWorker worker = new MultiWorker(this, this.associationListener, payload); + ExecutorService executorService = this.management + .getExecutorService(this.workerThreadTable[payload.getStreamNumber()]); + try { + executorService.execute(worker); + } catch (RejectedExecutionException e) { + logger.error(String.format("Rejected %s as Executors is shutdown", payload), e); + } catch (NullPointerException e) { + logger.error(String.format("NullPointerException while submitting %s", payload), e); + } catch (Exception e) { + logger.error(String.format("Exception while submitting %s", payload), e); + } + } + } + + @Override + protected boolean writePayload(PayloadData payloadData, boolean initMsg) { + try { + + if (txBuffer.hasRemaining()) { + multiplexer.getSocketMultiChannel().send(txBuffer, msgInfo); + } + // TODO Do we need to synchronize ConcurrentLinkedQueue? + // synchronized (this.txQueue) { + if (!txBuffer.hasRemaining()) { + txBuffer.clear(); + if (logger.isDebugEnabled()) { + logger.debug(String.format("Tx : Ass=%s %s", this.name, payloadData)); + } + + // load ByteBuffer + // TODO: BufferOverflowException ? + txBuffer.put(payloadData.getData()); + + int seqControl = payloadData.getStreamNumber(); + + if (seqControl < 0 || seqControl >= this.associationHandler.getMaxOutboundStreams()) { + try { + // TODO : calling in same Thread. Is this ok? or + // dangerous? + this.associationListener.inValidStreamId(payloadData); + } catch (Exception e) { + logger.warn(e); + } + txBuffer.clear(); + txBuffer.flip(); + return false; + } + + if (initMsg) { + msgInfo = MessageInfo.createOutgoing(this.initSocketAddress, seqControl); + } else { + msgInfo = MessageInfo.createOutgoing(this.peerSocketAddress, seqControl); + } + + msgInfo.payloadProtocolID(payloadData.getPayloadProtocolId()); + msgInfo.complete(payloadData.isComplete()); + msgInfo.unordered(payloadData.isUnordered()); + + logger.debug("write() - msgInfo: " + msgInfo); + txBuffer.flip(); + + multiplexer.getSocketMultiChannel().send(txBuffer, msgInfo); + + if (txBuffer.hasRemaining()) { + // Couldn't send all data. Lets return now and try to + // send + // this message in next cycle + return true; + } + return true; + } + return false; + } catch (IOException e) { + this.ioErrors++; + logger.error( + String.format("IOException while trying to write to underlying socket for Association=%s IOError count=%d", + this.name, this.ioErrors), + e); + logger.error("Internal send failed, retrying."); + this.close(); + onSendFailed(); + return false; + } catch (Exception ex) { + logger.error(String.format( + "Unexpected exception has been caught while trying to write SCTP socketChanel for Association=%s: %s", + this.name, ex.getMessage()), ex); + return false; + } + } + + protected void onSendFailed() { + // if started and down then it means it is a CANT_START event and scheduleConnect must be called. + if (started.get() && !up.get()) { + logger.warn("Association=" + getName() + " CANT_START, trying to reconnect..."); + switchInitSocketAddress(); + scheduleConnect(); + } + } + + // called when COMM_UP event arrived after association was stopped. + protected void silentlyShutdown() { + if (!started.get()) { + if (logger.isInfoEnabled()) { + logger.info("Association=" + getName() + + " has been already stopped when COMM_UP event arrived, closing sctp association without notifying any listeners."); + } + if (this.getSocketChannel() != null) { + try { + this.getSocketChannel().close(); + if (logger.isDebugEnabled()) { + logger.debug("close() - socketChannel is closed for association=" + getName()); + } + } catch (Exception e) { + logger.error(String.format("Exception while closing the SctpScoket for Association=%s", this.name), e); + } + } + } + } + + @Override + public void acceptAnonymousAssociation(AssociationListener associationListener) throws Exception { + throw new UnsupportedOperationException(this.getClass() + " class does not implement SERVER type Associations!"); + } + + @Override + public void rejectAnonymousAssociation() { + throw new UnsupportedOperationException(this.getClass() + " class does not implement SERVER type Associations!"); + } + + @Override + public void stopAnonymousAssociation() throws Exception { + throw new UnsupportedOperationException(this.getClass() + " class does not implement SERVER type Associations!"); + } + + void onCantStart() { + this.close(); + + } +}