Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions jpos/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
exports org.jpos.log.render.markdown;
exports org.jpos.log.evt;
exports org.jpos.core.annotation;
exports org.jpos.metrics;
exports org.jpos.metrics.iso;

uses org.jpos.core.EnvironmentProvider;
uses org.jpos.log.LogRenderer;
Expand Down
105 changes: 40 additions & 65 deletions jpos/src/main/java/org/jpos/iso/BaseChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package org.jpos.iso;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import org.jpos.core.Configurable;
import org.jpos.core.Configuration;
import org.jpos.core.ConfigurationException;
Expand All @@ -31,8 +28,8 @@
import org.jpos.jfr.ChannelEvent;
import org.jpos.log.evt.Connect;
import org.jpos.log.evt.Disconnect;
import org.jpos.metrics.MeterFactory;
import org.jpos.metrics.MeterInfo;
import org.jpos.metrics.iso.ISOMsgMetrics;
import org.jpos.util.*;

import javax.net.ssl.SSLSocket;
Expand All @@ -45,7 +42,7 @@

/*
* BaseChannel was ISOChannel. Now ISOChannel is an interface
* Revision: 1.34 Date: 2000/04/08 23:54:55
* Revision: 1.34 Date: 2000/04/08 23:54:55
*/

/**
Expand All @@ -61,7 +58,7 @@
* It now support the new Logger architecture so we will
* probably setup ISOChannelPanel to be a LogListener instead
* of being an Observer in future releases.
*
*
* @author Alejandro P. Revilla
* @author Bharavi Gade
* @version $Revision$ $Date$
Expand All @@ -74,7 +71,7 @@
*/
@SuppressWarnings("unchecked")
public abstract class BaseChannel extends Observable
implements FilteredChannel, ClientChannel, ServerChannel, FactoryChannel,
implements FilteredChannel, ClientChannel, ServerChannel, FactoryChannel, ISOMsgMetrics.Source,
LogSource, Configurable, BaseChannelMBean, Cloneable, ExceptionHandlerAware
{
private Socket socket;
Expand Down Expand Up @@ -114,10 +111,8 @@ public abstract class BaseChannel extends Observable
private boolean roundRobin = false;
private boolean debugIsoError = true;

private Counter msgOutCounter;
private Counter msgInCounter;
private MeterRegistry meterRegistry;
private String serverName;
private ISOMsgMetrics isoMsgMetrics;

private final UUID uuid;

private final Map<Class<? extends Exception>, List<ExceptionHandler>> exceptionHandlers = new HashMap<>();
Expand All @@ -131,8 +126,8 @@ public BaseChannel () {
uuid = UUID.randomUUID();
cnt = new int[SIZEOF_CNT];
name = "";
incomingFilters = new ArrayList();
outgoingFilters = new ArrayList();
incomingFilters = new ArrayList<>();
outgoingFilters = new ArrayList<>();
setHost(null, 0);
}

Expand Down Expand Up @@ -166,9 +161,7 @@ public BaseChannel (ISOPackager p) throws IOException {
* @exception IOException on error
* @see ISOPackager
*/
public BaseChannel (ISOPackager p, ServerSocket serverSocket)
throws IOException
{
public BaseChannel (ISOPackager p, ServerSocket serverSocket) throws IOException {
this();
setPackager (p);
setServerSocket (serverSocket);
Expand All @@ -185,7 +178,7 @@ public void setHost(String host, int port) {
this.hosts = new String[] { host };
this.ports = new int[] { port };
}

/**
* initialize an ISOChannel
* @param iface server TCP Address
Expand All @@ -195,7 +188,7 @@ public void setLocalAddress (String iface, int port) {
this.localIface = iface;
this.localPort = port;
}


/**
* @param host to connect (client ISOChannel)
Expand Down Expand Up @@ -274,27 +267,13 @@ public boolean isConnected() {
return socket != null && usable;
}

public void setCounters(Counter msgInCounter, Counter msgOutCounter) {
this.msgInCounter = msgInCounter;
this.msgOutCounter = msgOutCounter;
}
public Counter getMsgInCounter() {
return msgInCounter;
}
public Counter getMsgOutCounter() {
return msgOutCounter;
}
public void setMeterRegistry(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
public MeterRegistry getMeterRegistry() {
return meterRegistry;
}
public String getServerName() {
return serverName != null ? serverName : getName();
@Override
public void setISOMsgMetrics(ISOMsgMetrics metrics) {
isoMsgMetrics = metrics;
}
public void setServerName(String serverName) {
this.serverName = serverName;
@Override
public ISOMsgMetrics getISOMsgMetrics() {
return isoMsgMetrics;
}

/**
Expand Down Expand Up @@ -367,7 +346,7 @@ protected Socket newSocket(String host, int port) throws IOException {
throw new IOException (e.getMessage());
}
}
protected Socket newSocket (String[] hosts, int[] ports, LogEvent evt)
protected Socket newSocket (String[] hosts, int[] ports, LogEvent evt)
throws IOException
{
Socket s = null;
Expand Down Expand Up @@ -416,8 +395,8 @@ public ServerSocket getServerSocket() {
return serverSocket;
}

/**
* sets socket timeout (as suggested by
/**
* sets socket timeout (as suggested by
* Leonard Thomas <leonard@rhinosystemsinc.com>)
* @param timeout in milliseconds
* @throws SocketException on error
Expand Down Expand Up @@ -484,7 +463,7 @@ public void connect () throws IOException {
}

/**
* Accepts connection
* Accepts connection
* @exception IOException
*/
public void accept(ServerSocket s) throws IOException {
Expand Down Expand Up @@ -551,21 +530,21 @@ protected ISOPackager getDynamicPackager (byte[] header, byte[] image) {
}


/**
/**
* Allow subclasses to override the Default header on
* incoming messages.
* @param image message image
* @return ISOHeader instance
*/
protected ISOHeader getDynamicHeader (byte[] image) {
return image != null ?
return image != null ?
new BaseHeader (image) : null;
}
protected void sendMessageLength(int len) throws IOException { }
protected void sendMessageHeader(ISOMsg m, int len) throws IOException {
protected void sendMessageHeader(ISOMsg m, int len) throws IOException {
if (!isOverrideHeader() && m.getHeader() != null)
serverOut.write(m.getHeader());
else if (header != null)
else if (header != null)
serverOut.write(header);
}
/**
Expand Down Expand Up @@ -618,13 +597,13 @@ protected void getMessageTrailer(ISOMsg m) throws IOException {
getMessageTrailler();
}

protected void getMessage (byte[] b, int offset, int len) throws IOException, ISOException {
protected void getMessage (byte[] b, int offset, int len) throws IOException, ISOException {
serverIn.readFully(b, offset, len);
}
protected int getMessageLength() throws IOException, ISOException {
return -1;
}
protected int getHeaderLength() {
protected int getHeaderLength() {
return header != null ? header.length : 0;
}
protected int getHeaderLength(byte[] b) { return 0; }
Expand All @@ -637,7 +616,7 @@ protected int getHeaderLength(ISOMsg m) {
protected byte[] streamReceive() throws IOException {
return new byte[0];
}
protected void sendMessage (byte[] b, int offset, int len)
protected void sendMessage (byte[] b, int offset, int len)
throws IOException
{
serverOut.write(b, offset, len);
Expand Down Expand Up @@ -956,7 +935,7 @@ public void disconnect () throws IOException {
jfr.commit();
}
socket = null;
}
}
/**
* Issues a disconnect followed by a connect
* @exception IOException
Expand All @@ -978,7 +957,7 @@ public Logger getLogger() {
return logger;
}
public String getOriginalRealm() {
return originalRealm == null ?
return originalRealm == null ?
this.getClass().getName() : originalRealm;
}
/**
Expand Down Expand Up @@ -1070,19 +1049,19 @@ public void removeIncomingFilter (ISOFilter filter) {
public void removeOutgoingFilter (ISOFilter filter) {
removeFilter (filter, ISOMsg.OUTGOING);
}
protected ISOMsg applyOutgoingFilters (ISOMsg m, LogEvent evt)
protected ISOMsg applyOutgoingFilters (ISOMsg m, LogEvent evt)
throws VetoException
{
for (ISOFilter f :outgoingFilters)
m = f.filter (this, m, evt);
return m;
}
protected ISOMsg applyIncomingFilters (ISOMsg m, LogEvent evt)
throws VetoException
protected ISOMsg applyIncomingFilters (ISOMsg m, LogEvent evt)
throws VetoException
{
return applyIncomingFilters (m, null, null, evt);
}
protected ISOMsg applyIncomingFilters (ISOMsg m, byte[] header, byte[] image, LogEvent evt)
protected ISOMsg applyIncomingFilters (ISOMsg m, byte[] header, byte[] image, LogEvent evt)
throws VetoException
{
for (ISOFilter f :incomingFilters) {
Expand Down Expand Up @@ -1114,7 +1093,7 @@ protected byte[] pack (ISOMsg m) throws ISOException {
* @throws ConfigurationException
*/
public void setConfiguration (Configuration cfg)
throws ConfigurationException
throws ConfigurationException
{
this.cfg = cfg;
String h = cfg.get ("host");
Expand All @@ -1123,7 +1102,7 @@ public void setConfiguration (Configuration cfg)
sendTimeout = cfg.getLong ("send-timeout", sendTimeout);
if (h != null && h.length() > 0) {
if (port == 0)
throw new ConfigurationException
throw new ConfigurationException
("invalid port for host '"+h+"'");
setHost (h, port);
setLocalAddress (cfg.get("local-iface", null),cfg.getInt("local-port"));
Expand Down Expand Up @@ -1274,17 +1253,13 @@ private UUID getSocketUUID() {
}

protected void incrementMsgInCounter(ISOMsg m) throws ISOException {
if (meterRegistry != null && m != null && m.hasMTI()) {
var tags = Tags.of("name", getServerName(), "type", "server", "mti", m.getMTI());
msgInCounter = MeterFactory.updateCounter(meterRegistry, MeterInfo.ISOMSG_IN, tags);
msgInCounter.increment();
if (isoMsgMetrics != null) {
isoMsgMetrics.recordMessage(m, MeterInfo.ISOMSG_IN);
}
}
protected void incrementMsgOutCounter(ISOMsg m) throws ISOException {
if (meterRegistry != null && m != null && m.hasMTI()) {
var tags = Tags.of("name", getServerName(), "type", "server", "mti", m.getMTI());
msgOutCounter = MeterFactory.updateCounter(meterRegistry, MeterInfo.ISOMSG_OUT, tags);
msgOutCounter.increment();
if (isoMsgMetrics != null) {
isoMsgMetrics.recordMessage(m, MeterInfo.ISOMSG_OUT);
}
}
}
19 changes: 8 additions & 11 deletions jpos/src/main/java/org/jpos/iso/ISOServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,14 @@ private enum PermLogPolicy {

/**
* @param port port to listen
* @param clientSide client side ISOChannel (where we accept connections)
* @param clientSide client side ISOChannel, used as a "clonable template" to accept new connections
*/
public ISOServer(int port, ServerChannel clientSide, int maxSessions) {
super();
this.port = port;
this.clientSideChannel = clientSide;
this.clientPackager = clientSide.getPackager();
if (clientSide instanceof FilteredChannel) {
FilteredChannel fc = (FilteredChannel) clientSide;
if (clientSide instanceof FilteredChannel fc) {
this.clientOutgoingFilters = fc.getOutgoingFilters();
this.clientIncomingFilters = fc.getIncomingFilters();
}
Expand Down Expand Up @@ -322,13 +321,11 @@ public void run() {
LogEvent ev = new LogEvent()
.withSource(this)
.withTraceId(sessionUUID)
.add(new SessionStart(connectionCount.get(), permitsCount, sessionInfo)
.add(new SessionStart(getActiveConnections(), permitsCount, sessionInfo)
);
if (!checkPermission (socket, ev))
return;
realm = realm + "/" + socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
if (clientSideChannel instanceof BaseChannel bc)
baseChannel.setCounters(bc.getMsgInCounter(), bc.getMsgOutCounter());
}
try {
WeakReference<ISOChannel> wr = new WeakReference<> (channel);
Expand All @@ -337,8 +334,8 @@ public void run() {
while (true) try {
ISOMsg m = channel.receive();
lastTxn = System.currentTimeMillis();
for (Object listener : listeners) {
if (((ISORequestListener) listener).process(channel, m)) {
for (ISORequestListener listener : listeners) {
if (listener.process(channel, m)) {
break;
}
}
Expand All @@ -361,6 +358,7 @@ public void run() {
} catch (Throwable e) {
Logger.log (new LogEvent (this, "session-error", e));
}

try {
channel.disconnect();
fireEvent(new ISOServerClientDisconnectEvent(ISOServer.this, channel));
Expand All @@ -371,7 +369,7 @@ public void run() {
Logger.log(new LogEvent()
.withSource(this)
.withTraceId(sessionUUID)
.add(new SessionEnd(connectionCount.get(), permitsCount, sessionInfo)
.add(new SessionEnd(getActiveConnections(), permitsCount, sessionInfo)
)
);
}
Expand Down Expand Up @@ -471,7 +469,6 @@ private void checkPermission0 (Socket socket, LogEvent evt) throws ISOException
//-- This is the main run for this ISOServer's Thread
@Override
public void run() {
// ServerChannel channel;
if (socketFactory == null) {
socketFactory = this;
}
Expand Down Expand Up @@ -642,7 +639,7 @@ public ISOChannel getLastConnectedISOChannel () {
* @return ISOChannel under the given name
*/
public ISOChannel getISOChannel (String name) {
WeakReference ref = (WeakReference) channels.get (name);
WeakReference ref = channels.get (name);
if (ref != null) {
return (ISOChannel) ref.get ();
}
Expand Down
5 changes: 3 additions & 2 deletions jpos/src/main/java/org/jpos/metrics/MeterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ public enum MeterInfo {
ISOSERVER_CONNECTION_COUNT("jpos.server.connections", "Incoming active connections"),
ISOCHANNEL_CONNECTION_COUNT("jpos.channel.connections", "Outgoing active connections"),

ISOMSG_OUT ("jpos.isomsg", "Transmitted messages", Tags.of ("direction", "out")),
ISOMSG_IN ("jpos.isomsg", "Received messages", Tags.of ("direction", "in")),
ISOMSG_OUT("jpos.isomsg", "Transmitted messages", Tags.of ("direction", "out")),
ISOMSG_IN ("jpos.isomsg", "Received messages", Tags.of ("direction", "in")),

CHANNEL_ACTIVE_CONNECTIONS("jpos.channel.connections", "Active outgoing connections"),
CHANNEL_STATUS("jpos.channel.status", "Channel status"),

Expand Down
Loading
Loading