diff --git a/jpos/src/main/java/module-info.java b/jpos/src/main/java/module-info.java index 50292cd576..7bb4bf4de9 100644 --- a/jpos/src/main/java/module-info.java +++ b/jpos/src/main/java/module-info.java @@ -53,6 +53,8 @@ exports org.jpos.log.render.markdown; exports org.jpos.log.evt; exports org.jpos.core.annotation; + exports org.jpos.metrics; + exports org.jpos.metrics.iso; uses org.jpos.core.EnvironmentProvider; uses org.jpos.log.LogRenderer; diff --git a/jpos/src/main/java/org/jpos/iso/BaseChannel.java b/jpos/src/main/java/org/jpos/iso/BaseChannel.java index 761e46841a..a21d730476 100644 --- a/jpos/src/main/java/org/jpos/iso/BaseChannel.java +++ b/jpos/src/main/java/org/jpos/iso/BaseChannel.java @@ -18,9 +18,6 @@ package org.jpos.iso; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Tags; import org.jpos.core.Configurable; import org.jpos.core.Configuration; import org.jpos.core.ConfigurationException; @@ -31,8 +28,8 @@ import org.jpos.jfr.ChannelEvent; import org.jpos.log.evt.Connect; import org.jpos.log.evt.Disconnect; -import org.jpos.metrics.MeterFactory; import org.jpos.metrics.MeterInfo; +import org.jpos.metrics.iso.ISOMsgMetrics; import org.jpos.util.*; import javax.net.ssl.SSLSocket; @@ -45,7 +42,7 @@ /* * BaseChannel was ISOChannel. Now ISOChannel is an interface - * Revision: 1.34 Date: 2000/04/08 23:54:55 + * Revision: 1.34 Date: 2000/04/08 23:54:55 */ /** @@ -61,7 +58,7 @@ * It now support the new Logger architecture so we will * probably setup ISOChannelPanel to be a LogListener instead * of being an Observer in future releases. - * + * * @author Alejandro P. Revilla * @author Bharavi Gade * @version $Revision$ $Date$ @@ -74,7 +71,7 @@ */ @SuppressWarnings("unchecked") public abstract class BaseChannel extends Observable - implements FilteredChannel, ClientChannel, ServerChannel, FactoryChannel, + implements FilteredChannel, ClientChannel, ServerChannel, FactoryChannel, ISOMsgMetrics.Source, LogSource, Configurable, BaseChannelMBean, Cloneable, ExceptionHandlerAware { private Socket socket; @@ -114,10 +111,8 @@ public abstract class BaseChannel extends Observable private boolean roundRobin = false; private boolean debugIsoError = true; - private Counter msgOutCounter; - private Counter msgInCounter; - private MeterRegistry meterRegistry; - private String serverName; + private ISOMsgMetrics isoMsgMetrics; + private final UUID uuid; private final Map, List> exceptionHandlers = new HashMap<>(); @@ -131,8 +126,8 @@ public BaseChannel () { uuid = UUID.randomUUID(); cnt = new int[SIZEOF_CNT]; name = ""; - incomingFilters = new ArrayList(); - outgoingFilters = new ArrayList(); + incomingFilters = new ArrayList<>(); + outgoingFilters = new ArrayList<>(); setHost(null, 0); } @@ -166,9 +161,7 @@ public BaseChannel (ISOPackager p) throws IOException { * @exception IOException on error * @see ISOPackager */ - public BaseChannel (ISOPackager p, ServerSocket serverSocket) - throws IOException - { + public BaseChannel (ISOPackager p, ServerSocket serverSocket) throws IOException { this(); setPackager (p); setServerSocket (serverSocket); @@ -185,7 +178,7 @@ public void setHost(String host, int port) { this.hosts = new String[] { host }; this.ports = new int[] { port }; } - + /** * initialize an ISOChannel * @param iface server TCP Address @@ -195,7 +188,7 @@ public void setLocalAddress (String iface, int port) { this.localIface = iface; this.localPort = port; } - + /** * @param host to connect (client ISOChannel) @@ -274,27 +267,13 @@ public boolean isConnected() { return socket != null && usable; } - public void setCounters(Counter msgInCounter, Counter msgOutCounter) { - this.msgInCounter = msgInCounter; - this.msgOutCounter = msgOutCounter; - } - public Counter getMsgInCounter() { - return msgInCounter; - } - public Counter getMsgOutCounter() { - return msgOutCounter; - } - public void setMeterRegistry(MeterRegistry meterRegistry) { - this.meterRegistry = meterRegistry; - } - public MeterRegistry getMeterRegistry() { - return meterRegistry; - } - public String getServerName() { - return serverName != null ? serverName : getName(); + @Override + public void setISOMsgMetrics(ISOMsgMetrics metrics) { + isoMsgMetrics = metrics; } - public void setServerName(String serverName) { - this.serverName = serverName; + @Override + public ISOMsgMetrics getISOMsgMetrics() { + return isoMsgMetrics; } /** @@ -367,7 +346,7 @@ protected Socket newSocket(String host, int port) throws IOException { throw new IOException (e.getMessage()); } } - protected Socket newSocket (String[] hosts, int[] ports, LogEvent evt) + protected Socket newSocket (String[] hosts, int[] ports, LogEvent evt) throws IOException { Socket s = null; @@ -416,8 +395,8 @@ public ServerSocket getServerSocket() { return serverSocket; } - /** - * sets socket timeout (as suggested by + /** + * sets socket timeout (as suggested by * Leonard Thomas ) * @param timeout in milliseconds * @throws SocketException on error @@ -484,7 +463,7 @@ public void connect () throws IOException { } /** - * Accepts connection + * Accepts connection * @exception IOException */ public void accept(ServerSocket s) throws IOException { @@ -551,21 +530,21 @@ protected ISOPackager getDynamicPackager (byte[] header, byte[] image) { } - /** + /** * Allow subclasses to override the Default header on * incoming messages. * @param image message image * @return ISOHeader instance */ protected ISOHeader getDynamicHeader (byte[] image) { - return image != null ? + return image != null ? new BaseHeader (image) : null; } protected void sendMessageLength(int len) throws IOException { } - protected void sendMessageHeader(ISOMsg m, int len) throws IOException { + protected void sendMessageHeader(ISOMsg m, int len) throws IOException { if (!isOverrideHeader() && m.getHeader() != null) serverOut.write(m.getHeader()); - else if (header != null) + else if (header != null) serverOut.write(header); } /** @@ -618,13 +597,13 @@ protected void getMessageTrailer(ISOMsg m) throws IOException { getMessageTrailler(); } - protected void getMessage (byte[] b, int offset, int len) throws IOException, ISOException { + protected void getMessage (byte[] b, int offset, int len) throws IOException, ISOException { serverIn.readFully(b, offset, len); } protected int getMessageLength() throws IOException, ISOException { return -1; } - protected int getHeaderLength() { + protected int getHeaderLength() { return header != null ? header.length : 0; } protected int getHeaderLength(byte[] b) { return 0; } @@ -637,7 +616,7 @@ protected int getHeaderLength(ISOMsg m) { protected byte[] streamReceive() throws IOException { return new byte[0]; } - protected void sendMessage (byte[] b, int offset, int len) + protected void sendMessage (byte[] b, int offset, int len) throws IOException { serverOut.write(b, offset, len); @@ -956,7 +935,7 @@ public void disconnect () throws IOException { jfr.commit(); } socket = null; - } + } /** * Issues a disconnect followed by a connect * @exception IOException @@ -978,7 +957,7 @@ public Logger getLogger() { return logger; } public String getOriginalRealm() { - return originalRealm == null ? + return originalRealm == null ? this.getClass().getName() : originalRealm; } /** @@ -1070,19 +1049,19 @@ public void removeIncomingFilter (ISOFilter filter) { public void removeOutgoingFilter (ISOFilter filter) { removeFilter (filter, ISOMsg.OUTGOING); } - protected ISOMsg applyOutgoingFilters (ISOMsg m, LogEvent evt) + protected ISOMsg applyOutgoingFilters (ISOMsg m, LogEvent evt) throws VetoException { for (ISOFilter f :outgoingFilters) m = f.filter (this, m, evt); return m; } - protected ISOMsg applyIncomingFilters (ISOMsg m, LogEvent evt) - throws VetoException + protected ISOMsg applyIncomingFilters (ISOMsg m, LogEvent evt) + throws VetoException { return applyIncomingFilters (m, null, null, evt); } - protected ISOMsg applyIncomingFilters (ISOMsg m, byte[] header, byte[] image, LogEvent evt) + protected ISOMsg applyIncomingFilters (ISOMsg m, byte[] header, byte[] image, LogEvent evt) throws VetoException { for (ISOFilter f :incomingFilters) { @@ -1114,7 +1093,7 @@ protected byte[] pack (ISOMsg m) throws ISOException { * @throws ConfigurationException */ public void setConfiguration (Configuration cfg) - throws ConfigurationException + throws ConfigurationException { this.cfg = cfg; String h = cfg.get ("host"); @@ -1123,7 +1102,7 @@ public void setConfiguration (Configuration cfg) sendTimeout = cfg.getLong ("send-timeout", sendTimeout); if (h != null && h.length() > 0) { if (port == 0) - throw new ConfigurationException + throw new ConfigurationException ("invalid port for host '"+h+"'"); setHost (h, port); setLocalAddress (cfg.get("local-iface", null),cfg.getInt("local-port")); @@ -1274,17 +1253,13 @@ private UUID getSocketUUID() { } protected void incrementMsgInCounter(ISOMsg m) throws ISOException { - if (meterRegistry != null && m != null && m.hasMTI()) { - var tags = Tags.of("name", getServerName(), "type", "server", "mti", m.getMTI()); - msgInCounter = MeterFactory.updateCounter(meterRegistry, MeterInfo.ISOMSG_IN, tags); - msgInCounter.increment(); + if (isoMsgMetrics != null) { + isoMsgMetrics.recordMessage(m, MeterInfo.ISOMSG_IN); } } protected void incrementMsgOutCounter(ISOMsg m) throws ISOException { - if (meterRegistry != null && m != null && m.hasMTI()) { - var tags = Tags.of("name", getServerName(), "type", "server", "mti", m.getMTI()); - msgOutCounter = MeterFactory.updateCounter(meterRegistry, MeterInfo.ISOMSG_OUT, tags); - msgOutCounter.increment(); + if (isoMsgMetrics != null) { + isoMsgMetrics.recordMessage(m, MeterInfo.ISOMSG_OUT); } } } diff --git a/jpos/src/main/java/org/jpos/iso/ISOServer.java b/jpos/src/main/java/org/jpos/iso/ISOServer.java index 0b0346551f..e5acd3f4d2 100644 --- a/jpos/src/main/java/org/jpos/iso/ISOServer.java +++ b/jpos/src/main/java/org/jpos/iso/ISOServer.java @@ -102,15 +102,14 @@ private enum PermLogPolicy { /** * @param port port to listen - * @param clientSide client side ISOChannel (where we accept connections) + * @param clientSide client side ISOChannel, used as a "clonable template" to accept new connections */ public ISOServer(int port, ServerChannel clientSide, int maxSessions) { super(); this.port = port; this.clientSideChannel = clientSide; this.clientPackager = clientSide.getPackager(); - if (clientSide instanceof FilteredChannel) { - FilteredChannel fc = (FilteredChannel) clientSide; + if (clientSide instanceof FilteredChannel fc) { this.clientOutgoingFilters = fc.getOutgoingFilters(); this.clientIncomingFilters = fc.getIncomingFilters(); } @@ -322,13 +321,11 @@ public void run() { LogEvent ev = new LogEvent() .withSource(this) .withTraceId(sessionUUID) - .add(new SessionStart(connectionCount.get(), permitsCount, sessionInfo) + .add(new SessionStart(getActiveConnections(), permitsCount, sessionInfo) ); if (!checkPermission (socket, ev)) return; realm = realm + "/" + socket.getInetAddress().getHostAddress() + ":" + socket.getPort(); - if (clientSideChannel instanceof BaseChannel bc) - baseChannel.setCounters(bc.getMsgInCounter(), bc.getMsgOutCounter()); } try { WeakReference wr = new WeakReference<> (channel); @@ -337,8 +334,8 @@ public void run() { while (true) try { ISOMsg m = channel.receive(); lastTxn = System.currentTimeMillis(); - for (Object listener : listeners) { - if (((ISORequestListener) listener).process(channel, m)) { + for (ISORequestListener listener : listeners) { + if (listener.process(channel, m)) { break; } } @@ -361,6 +358,7 @@ public void run() { } catch (Throwable e) { Logger.log (new LogEvent (this, "session-error", e)); } + try { channel.disconnect(); fireEvent(new ISOServerClientDisconnectEvent(ISOServer.this, channel)); @@ -371,7 +369,7 @@ public void run() { Logger.log(new LogEvent() .withSource(this) .withTraceId(sessionUUID) - .add(new SessionEnd(connectionCount.get(), permitsCount, sessionInfo) + .add(new SessionEnd(getActiveConnections(), permitsCount, sessionInfo) ) ); } @@ -471,7 +469,6 @@ private void checkPermission0 (Socket socket, LogEvent evt) throws ISOException //-- This is the main run for this ISOServer's Thread @Override public void run() { - // ServerChannel channel; if (socketFactory == null) { socketFactory = this; } @@ -642,7 +639,7 @@ public ISOChannel getLastConnectedISOChannel () { * @return ISOChannel under the given name */ public ISOChannel getISOChannel (String name) { - WeakReference ref = (WeakReference) channels.get (name); + WeakReference ref = channels.get (name); if (ref != null) { return (ISOChannel) ref.get (); } diff --git a/jpos/src/main/java/org/jpos/metrics/MeterInfo.java b/jpos/src/main/java/org/jpos/metrics/MeterInfo.java index f614097a31..edf5b083f7 100644 --- a/jpos/src/main/java/org/jpos/metrics/MeterInfo.java +++ b/jpos/src/main/java/org/jpos/metrics/MeterInfo.java @@ -28,8 +28,9 @@ public enum MeterInfo { ISOSERVER_CONNECTION_COUNT("jpos.server.connections", "Incoming active connections"), ISOCHANNEL_CONNECTION_COUNT("jpos.channel.connections", "Outgoing active connections"), - ISOMSG_OUT ("jpos.isomsg", "Transmitted messages", Tags.of ("direction", "out")), - ISOMSG_IN ("jpos.isomsg", "Received messages", Tags.of ("direction", "in")), + ISOMSG_OUT("jpos.isomsg", "Transmitted messages", Tags.of ("direction", "out")), + ISOMSG_IN ("jpos.isomsg", "Received messages", Tags.of ("direction", "in")), + CHANNEL_ACTIVE_CONNECTIONS("jpos.channel.connections", "Active outgoing connections"), CHANNEL_STATUS("jpos.channel.status", "Channel status"), diff --git a/jpos/src/main/java/org/jpos/metrics/iso/ISOMsgCounter.java b/jpos/src/main/java/org/jpos/metrics/iso/ISOMsgCounter.java new file mode 100644 index 0000000000..6ff6dba8b3 --- /dev/null +++ b/jpos/src/main/java/org/jpos/metrics/iso/ISOMsgCounter.java @@ -0,0 +1,352 @@ +package org.jpos.metrics.iso; + +import io.micrometer.core.instrument.*; +import org.jpos.core.Configurable; +import org.jpos.core.Configuration; +import org.jpos.core.ConfigurationException; +import org.jpos.core.Environment; +import org.jpos.iso.ISOMsg; +import org.jpos.metrics.MeterFactory; +import org.jpos.metrics.MeterInfo; +import org.jpos.util.LogEvent; +import org.jpos.util.LogSource; +import org.jpos.util.Logger; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ISOMsgCounter implements ISOMsgMetrics, LogSource, Configurable { + private final static Map> aliases= Map.of( + "mti", (m) -> m.getString(0), + "rc", (m) -> m.getString(39), + "scheme", (m) -> m.getString("113.66"), // jPOS-CMF field + "isemv", (m) -> Boolean.toString(m.hasField(55)), + "ttype", ISOMsgCounter::getTtype, + "itc", ISOMsgCounter::getITC + ); + + private MeterRegistry registry; + + // Store my meters for safe removal later on. Meters are uniquely identified by their Meter.Id. + private final Set meters = ConcurrentHashMap.newKeySet(); + private boolean frozen = false; + + // custom properties + private String metricName; + private String metricDescription; + private Tags tags = Tags.empty(); + private final Map fieldSet = new HashMap<>(); + + private Logger logger; + private String realm; + + public ISOMsgCounter() throws ConfigurationException { + // Configure initial **default global** tags and fieldsets from env vars. + + // Custom tags are added to the Meter. + // Syntax: comma/space separated entries of the form "tag:value" or just "tag" . + var envTags = Environment.get("${"+ENV_CHANNEL_TAGS+"}", DEFAULT_TAGS); + var tagMap = parseTagPairs(envTags, false); + tagMap.forEach((k,v) -> tags = tags.and(k,v)); + + // Fieldsets are tags and values taken from the ISOMsg + // Syntax: comma/space separated entries of the form "alias, tag:alias, tag:isofield". + var envFields = Environment.get("${"+ENV_CHANNEL_FIELDS+"}", DEFAULT_CHANNEL_FIELDS); + var fieldsMap = parseTagPairs(envFields, true); + validateFieldSetMap(fieldsMap); + fieldSet.putAll(fieldsMap); + } + + + /** + * @return This overrides the default implementation, also including the keys from the internal field set. + */ + @Override + public String getMetricSignature() { + List keys = new ArrayList<>(fieldSet.size()*2); + tags.forEach(t -> keys.add(t.getKey())); + fieldSet.forEach((k,_) -> keys.add(k)); + return getMetricName()+"|"+ + keys.stream().sorted().collect(Collectors.joining(",")); + } + + + @Override + public String getMetricName() { + return metricName != null ? metricName : DEFAULT_CHANNEL_METRIC_NAME; + } + @Override + public void setMetricName(String metricName) { + throwIfFrozen(true); + Objects.requireNonNull(metricName, "Metric name can't be null"); + this.metricName = metricName; + } + + + public String getMetricDescription() { + return metricDescription != null ? metricDescription : ""; + } + public void setMetricDescription(String metricDescription) { + this.metricDescription = metricDescription; + } + + @Override + public Tags addTags(Tags tags) { + throwIfFrozen(true); + if (!meters.isEmpty()) { + String name = tags.stream() + .filter(t->"name".equals(t.getKey())) + .map(Tag::getValue).findAny() + .orElse(getMetricName()); + throw new IllegalStateException("ISOMsgCounter "+name+" can't add tags after started"); + } + return (this.tags= this.tags.and(tags)); + } + public Tags getTags() { + return Tags.of(tags); + } + + @Override + public void recordMessage(ISOMsg m) { + if (registry != null && m != null) { + throwIfFrozen(false); + Tags ft = resolveFieldTags(m, fieldSet); + Counter c = MeterFactory.updateCounter(registry, + getMetricName(), + tags.and(ft), + getMetricDescription()); + meters.add(c); + c.increment(); + } + } + + @Override + public void recordMessage(ISOMsg m, MeterInfo meterInfo) { + if (registry != null && m != null) { + throwIfFrozen(false); + Tags ft = resolveFieldTags(m, fieldSet); + String myName = getMetricName(); + + Counter c; + if (!DEFAULT_CHANNEL_METRIC_NAME.equals(myName)) + c = MeterFactory.updateCounter(registry, + getMetricName(), + meterInfo.add(tags).and(ft), // allow our tags to override meterInfo's + getMetricDescription()); + else + c = MeterFactory.updateCounter(registry, meterInfo, tags.and(ft)); + meters.add(c); + c.increment(); + } + } + + + @Override + public boolean register(MeterRegistry registry) { + Objects.requireNonNull(registry, "Null registry passed to register() method."); + this.registry = registry; + frozen = true; + return true; + } + + @Override + public void unregister() { + removeMeters(); + registry = null; + } + + + @Override + public MeterRegistry getRegistry() { + return registry; + } + + + @Override + public void removeMeters() { + if (registry != null) { + LogEvent evt = logger != null ? new LogEvent(this, "info", "Removing meters: ") : null; + + // flag will make new recordMessage calls fail, in a NON-thread safe way + // but this is normally called after the channel is being stopped anyway + frozen = false; + meters.forEach(m -> { + if (evt != null) evt.addMessage(m.getId()); + registry.remove(m); + }); + meters.clear(); + if (evt != null) + Logger.log(evt); + } + } + + // ============= configuration ============= + + @Override + public void setConfiguration(Configuration cfg) throws ConfigurationException { + String name = cfg.get("name", null); + if (name != null) + setMetricName(name); + boolean customName = !DEFAULT_CHANNEL_METRIC_NAME.equals(getMetricName()); + + setMetricDescription(cfg.get("description", null)); + + // Process custom tag overrides (global defaults were handled in constructor). + // Custom config overrides can only override the values of the pre-existing global env tags. + // New tags can't be added, global tags can't be removed. + // + // Exception: If this class has a custom metric name, then it can define its own tag set. + boolean hasTags = cfg.get("tags", null) != null; + if (customName && hasTags) + tags = Tags.empty(); // start afresh if custom metric has tags + + var currTags = getTagsAsMap(); + var ovrMap = parseTagPairs(cfg.get("tags", ""), false); + for (var ent : ovrMap.entrySet()) { + if (currTags.containsKey(ent.getKey()) || customName) // if known tag, or custom name + currTags.put(ent.getKey(), ent.getValue()); // then allow override! + else + throw new ConfigurationException("Attempt to add unknown metric tag: '"+ent.getKey()+"'"); + } + currTags.forEach((k,v) -> tags = tags.and(k,v)); // add/override all custom tags to our tags + + + // Process custom isofield overrides (global defaults were handled in constructor). + // Custom overrides can only override pre-existing env tags, unless this class has a custom metric name. + boolean hasFields = cfg.get("fields", null) != null; + if (customName && hasFields) + fieldSet.clear(); // start afresh if custom metric has fields + + var fieldsOvrMap = parseTagPairs(cfg.get("fields", ""), true); + validateFieldSetMap(fieldsOvrMap); + for (var ent : fieldsOvrMap.entrySet()) { + if (fieldSet.containsKey(ent.getKey()) || customName) // known tag, or custom metric + fieldSet.put(ent.getKey(), ent.getValue()); // allow, override! + else + throw new ConfigurationException( + "Attempt to add unknown metric isofield tag: '"+ent.getKey()+"'"); + } + } + + + + // copySingleTag: copy the tag name as value if no colon syntax; else, set value as empty string + protected Map parseTagPairs(String tp, boolean copySingleTag) { + Map ret = new HashMap<>(); + String[] tagPairs = tp.trim().split("[, ]+"); + for (String pair : tagPairs) { + if (pair.isEmpty()) continue; // avoids possible commas at beginning of tp + + String[] tv = pair.trim().split(":"); + ret.put(tv[0], + tv.length >= 2 ? tv[1] : + copySingleTag ? tv[0] : ""); + } + return ret; + } + + protected void validateFieldSetMap(Map fieldsMap) throws ConfigurationException { + for (var valexpr : fieldsMap.values()) { + boolean isField = valexpr.matches("^[0-9]+(\\.[0-9]+)*$"); // is isomsg field path dot-syntax? + if (!isField && aliases.get(valexpr) == null) + throw new ConfigurationException("Unknown metric tag alias for fieldset: '"+valexpr+"'"); + } + } + + + // ============= some helper methods ============= + + private void throwIfFrozen(boolean frozenCondition) { + if (frozen == frozenCondition) + throw new IllegalStateException(frozen ? + "Can't modify this ISOMsgCounter after registration ("+getMetricSignature()+")" : + "Can't use this ISOMsgCounter before registration ("+getMetricSignature()+")" + ); + } + + + /** returns a clone set, which may not be up to date next time you use it */ + protected Set getMeters() { + return new HashSet<>(meters); + } + + // useful for tests and debugging + protected Map getFieldSet() { + return Map.copyOf(fieldSet); + } + + + // Make Tags easy to navigate; do not abuse of this one unless you need repeated querying + public Map getTagsAsMap() { + Map tm= new HashMap<>(); // make Tags easy to navigate + getTags().forEach(t->tm.put(t.getKey(),t.getValue())); + return tm; + } + + private static String getTtype(ISOMsg m) { + return m.hasField(3) ? m.getString(3).substring(0,2) : ""; + } + + private static String getITC(ISOMsg m) { + String mti = m.getString(0); + if (mti == null || mti.trim().isEmpty()) return ""; + // some common fields to make an ITC from + return Stream.of(mti, getTtype(m), m.getString(24), m.getString(25), m.getString(70)) + .filter(s -> s != null && !s.isEmpty()) + .collect(Collectors.joining(".")); + } + + + /** + * Hook for subclasses to resolve, against an ISOMsg, the valexpr part of a tag:valexpr in a fieldset. + *
+ * A subclass may add or override its own aliases, or have a special way to convert "valexpr" + * to a String taken from the given ISOMsg. + *
+ * If the subclass can't resolve the alias/valexpr, it may call super (i.e. this method) + * as a fallback. + */ + protected String resolveValExpr(ISOMsg m, String val) { + var fun = aliases.get(val); // check if valexpr is a registered alias + val = fun != null ? fun.apply(m) : m.getString(val); + return val != null ? val : ""; + } + + /** + * Returns a micrometer {@link Tags}, with keys and values resolved from a given fieldset against + * a given ISOMsg.
+ * Some of the valexprs in the fieldset may be aliases that need to be resolved to an ISOMsg field, + * and the field path is used to get the value from the ISOMsg.
+ * This method relies on the protected {@link #resolveValExpr(ISOMsg, String)} to resolve each valexpr. + * + * @param m the ISOMsg + * @return a micrometer {@link Tags} with all the tags from fieldset and the resolved values from the message + */ + private Tags resolveFieldTags(ISOMsg m, Map fieldset) { + Tags tt = Tags.empty(); + // each entry is {tag,valexpr}, where valexpr may be an alias or an isofield path + for (var ent : fieldset.entrySet()) { + String val = resolveValExpr(m, ent.getValue()); + tt = tt.and(ent.getKey(), val); + } + return tt; + } + + + @Override + public void setLogger(Logger logger, String realm) { + this.logger = logger; + this.realm = realm; + } + @Override + public String getRealm() { + return realm; + } + @Override + public Logger getLogger() { + return logger; + } +} diff --git a/jpos/src/main/java/org/jpos/metrics/iso/ISOMsgMetrics.java b/jpos/src/main/java/org/jpos/metrics/iso/ISOMsgMetrics.java new file mode 100644 index 0000000000..bf663697a3 --- /dev/null +++ b/jpos/src/main/java/org/jpos/metrics/iso/ISOMsgMetrics.java @@ -0,0 +1,116 @@ +package org.jpos.metrics.iso; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; +import org.jpos.iso.ISOMsg; +import org.jpos.metrics.MeterInfo; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public interface ISOMsgMetrics { + String DEFAULT_CHANNEL_METRIC_NAME = MeterInfo.ISOMSG_IN.id(); + + String ENV_CHANNEL_TAGS = "metrics.channel.tags"; + String DEFAULT_TAGS = "name, type, direction"; + + String ENV_CHANNEL_FIELDS = "metrics.channel.fields"; + String DEFAULT_CHANNEL_FIELDS = "mti"; + + interface Source{ + void setISOMsgMetrics(ISOMsgMetrics metrics); + ISOMsgMetrics getISOMsgMetrics(); + } + + void setMetricName(String metricName); + String getMetricName(); + + String getMetricDescription(); + void setMetricDescription(String metricDescription); + + Tags addTags(Tags tags); + default Tags addTags(String ...tags) { return addTags(Tags.of(tags)); } + default Tags getTags() { return addTags(Tags.empty()); } + + /** + * Records an {@link ISOMsg} in the meter registry.
+ * The metric name and tags will be taken strictly from this object's + * configuration.
+ * If this object hasn't been successfully registered, it throws an + * {@link IllegalStateException}. + * + * @param m the {@link ISOMsg} to record. + * @throws IllegalStateException when this object hasn't been registered + */ + void recordMessage(ISOMsg m) throws IllegalStateException; + + /** + * Records an {@link ISOMsg} in the meter registry.
+ * Similar to {@link #recordMessage(ISOMsg)} but using the metric name, description and maybe some + * tags taken from the {@link MeterInfo} argument. + *

+ * If the metric for that combination of {@link MeterInfo} values and local values fails to register + * in the global {@link MeterRegistry} (or any underlying one like the Prometheus registry), the method + * may throw an {@link IllegalStateException}. This also happens if this object hasn't been successfully + * registered by The metric name and tags will be taken from + * what has been configured. If this object hasn't been successfully registered, it throws an + * {@link IllegalStateException}. + * + * @param m the {@link ISOMsg} to record. + * @throws IllegalStateException when this object hasn't been registered + */ + void recordMessage(ISOMsg m, MeterInfo meterInfo) throws IllegalStateException; + + + /** + * Register this object to work with a given {@link MeterRegistry}.
+ * + * This method may serve more than one purpose in the object's lifecycle: + *

    + *
  • Assign a {@link MeterRegistry} to be used for the created meters. + * (The registry can be obtained by calling {@link #getRegistry()})
  • + *
  • Before this object has been registered, it can be configured by setting tags, etc., + * but attempting to record a message (e.g. through {@link #recordMessage(ISOMsg)}) + * will throw an {@link IllegalStateException}.
  • + *
  • After it has been registered, it's ready to record messages. + * However, it can't be configured any longer, or it will throw an {@link IllegalStateException}. + * The object's configuration can be considered "frozen".
  • + *
  • In some (future) implementation, it may make use of the {@link #getMetricSignature()} to + * do some caching to ensure that every metric name has only one set of tag keys, thus avoiding + * metrics name+keyset collision in {@code PrometheusMeterRegistry}.
  • + *
+ * + * The {@link #unregister()} method should be called when done using this object. + * + * @return true if successful, false if there was an error having this registered + */ + boolean register(MeterRegistry registry); + + /** + * It calls {@link #removeMeters()} and clears its internal reference to its {@link MeterRegistry}.
+ * + * It will also "unfreeze" the object, making it available for reconfiguration. + */ + void unregister(); + + MeterRegistry getRegistry(); + + void removeMeters(); + + + /** + * A unique meter signature, concatenating the meter name, vertical pipe, comma-separated sorted list of all tag keys + * that this object produces.
+ * Default implementation uses the values from {@link #getMetricName()} and {@link #getTags()}. + * A concrete implementation must make sure of gathering all the appropriate tag keys from internal config state + * which may include more than what's returned by {@link #getTags()}. + * @return The unique metric signature. + */ + default String getMetricSignature() { + List keys = new ArrayList<>(); + getTags().forEach(t -> keys.add(t.getKey())); + return getMetricName()+"|"+ + keys.stream().sorted().collect(Collectors.joining(",")); + } +} diff --git a/jpos/src/main/java/org/jpos/q2/iso/ChannelAdaptor.java b/jpos/src/main/java/org/jpos/q2/iso/ChannelAdaptor.java index 1b3881171a..0df24389bc 100644 --- a/jpos/src/main/java/org/jpos/q2/iso/ChannelAdaptor.java +++ b/jpos/src/main/java/org/jpos/q2/iso/ChannelAdaptor.java @@ -18,7 +18,6 @@ package org.jpos.q2.iso; -import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.binder.BaseUnits; @@ -31,6 +30,8 @@ import org.jpos.iso.*; import org.jpos.metrics.MeterFactory; import org.jpos.metrics.MeterInfo; +import org.jpos.metrics.iso.ISOMsgCounter; +import org.jpos.metrics.iso.ISOMsgMetrics; import org.jpos.q2.QBeanSupport; import org.jpos.q2.QFactory; import org.jpos.space.Space; @@ -81,8 +82,6 @@ public class ChannelAdaptor private Gauge connectionsGauge; - private Counter msgOutCounter; - private Counter msgInCounter; @Config("soft-stop") private long softStop; public ChannelAdaptor () { @@ -135,7 +134,7 @@ public void destroyService () { } public synchronized void setReconnectDelay (long delay) { - getPersist().getChild ("reconnect-delay") + getPersist().getChild ("reconnect-delay") .setText (Long.toString (delay)); this.delay = delay; setModified (true); @@ -202,9 +201,8 @@ public String getOutQueue () { return out; } - public ISOChannel newChannel (Element e, QFactory f) - throws ConfigurationException - { + /** Parses a {@code } element, returning an {@link ISOChannel} */ + public ISOChannel newChannel (Element e, QFactory f) throws ConfigurationException { String channelName = QFactory.getAttributeValue (e, "class"); String packagerName = QFactory.getAttributeValue (e, "packager"); @@ -226,8 +224,58 @@ public ISOChannel newChannel (Element e, QFactory f) addExceptionHandlers((ExceptionHandlerAware) channel, e, f); } + if (channel instanceof ISOMsgMetrics.Source metricsChannel) { + String type = "default"; // default alias, in case metrics not defined + String clazz = null; + + Element met = e.getChild("metrics"); + if (met != null) { + if (QFactory.isEnabled(met)) { + clazz = QFactory.getAttributeValue(met, "class"); + String typeAttr = QFactory.getAttributeValue(met, "type"); + type = (clazz != null) ? "class" : // class attribute has precedence over type + (typeAttr != null) ? typeAttr : + type; + } else { + type = "none"; // equivalent to type="none" + } + } + + ISOMsgMetrics m = switch (type) { + case "none" -> null; + + case "default" -> { + var mc = new ISOMsgCounter(); + if (met != null) + f.setLogger(mc, met); + else + mc.setLogger(this.getLog().getLogger(), this.getRealm()+"/metrics"); + yield mc; + } + + case "counter" -> { + var mc = new ISOMsgCounter(); + f.setLogger(mc, met); + f.setConfiguration(mc, met); + yield mc; + } + + case "class" -> { + ISOMsgMetrics mc = f.newInstance(clazz); + f.setLogger(mc, met); + f.setConfiguration(mc, met); + yield mc; + } + + default -> throw new ConfigurationException("Unknown metric type '"+type+"'"); + }; + + metricsChannel.setISOMsgMetrics(m); + } // metrics config + if (getName () != null) channel.setName (getName ()); + return channel; } @@ -252,7 +300,6 @@ else if ("both".equalsIgnoreCase(direction)) { } - protected ISOChannel initChannel () throws ConfigurationException { Element persist = getPersist (); Element e = persist.getChild ("channel"); @@ -260,6 +307,7 @@ protected ISOChannel initChannel () throws ConfigurationException { throw new ConfigurationException ("channel element missing"); ISOChannel c = newChannel (e, getFactory()); + String socketFactoryString = getSocketFactory(); if (socketFactoryString != null && c instanceof FactoryChannel) { ISOClientSocketFactory sFac = getFactory().newInstance(socketFactoryString); @@ -269,8 +317,10 @@ protected ISOChannel initChannel () throws ConfigurationException { getFactory().setConfiguration (sFac, e); ((FactoryChannel)c).setSocketFactory(sFac); } + return c; } + protected void initSpaceAndQueues () throws ConfigurationException { Element persist = getPersist (); sp = grabSpace (persist.getChild ("space")); @@ -305,15 +355,14 @@ public void run () { if (!running()) break; Object o = sp.in (in, delay); - if (o instanceof ISOMsg) { + if (o instanceof ISOMsg m) { if (!channel.isConnected()) { // push back the message so it can be handled by another channel adaptor sp.push(in, o); continue; } - channel.send ((ISOMsg) o); + channel.send(m); tx++; - incrementMsgOutCounter((ISOMsg) o); } else if (o instanceof Integer) { if ((int)o != hashCode()) { // STOP indicator seems to be for another channel adaptor @@ -326,7 +375,7 @@ public void run () { else if (keepAlive && channel.isConnected() && channel instanceof BaseChannel) { ((BaseChannel)channel).sendKeepAlive(); } - } catch (ISOFilter.VetoException e) { + } catch (ISOFilter.VetoException e) { // getLog().warn ("channel-sender-"+in, e.getMessage ()); } catch (ISOException e) { // getLog().warn ("channel-sender-"+in, e.getMessage ()); @@ -334,7 +383,7 @@ else if (keepAlive && channel.isConnected() && channel instanceof BaseChannel) { disconnect (); } ISOUtil.sleep (1000); // slow down on errors - } catch (Exception e) { + } catch (Exception e) { // getLog().warn ("channel-sender-"+in, e.getMessage ()); disconnect (); ISOUtil.sleep (1000); @@ -374,7 +423,6 @@ public void run () { continue; } ISOMsg m = channel.receive (); - incrementMsgInCounter(m); rx++; lastTxn = System.currentTimeMillis(); if (timeout > 0) @@ -401,7 +449,7 @@ public void run () { sp.push (in, hashCode()); // wake-up Sender ISOUtil.sleep(1000); } - } catch (Exception e) { + } catch (Exception e) { if (running()) { // getLog().warn ("channel-receiver-"+out, e); sp.out (reconnect, Boolean.TRUE, delay); @@ -523,9 +571,12 @@ protected void append (StringBuilder sb, String name, int value) { sb.append (name); sb.append (value); } + private void initMeters() { - var tags = Tags.of("name", getName(), "type", "client"); + var tags = Tags.of("name", getName(), + "type", "client"); var registry = getServer().getMeterRegistry(); + connectionsGauge = MeterFactory.gauge (registry, MeterInfo.ISOCHANNEL_CONNECTION_COUNT, @@ -533,27 +584,24 @@ private void initMeters() { BaseUnits.SESSIONS, () -> isConnected() ? 1 : 0 ); - } - protected void incrementMsgInCounter(ISOMsg m) throws ISOException { - if (m != null && m.hasMTI()) { - var tags = Tags.of("name", getName(), "type", "client", "mti", m.getMTI()); - msgInCounter = MeterFactory.updateCounter(getServer().getMeterRegistry(), MeterInfo.ISOMSG_IN, tags); - msgInCounter.increment(); - } - } - protected void incrementMsgOutCounter(ISOMsg m) throws ISOException { - if (m != null && m.hasMTI()) { - var tags = Tags.of("name", getName(), "type", "client", "mti", m.getMTI()); - msgOutCounter = MeterFactory.updateCounter(getServer().getMeterRegistry(), MeterInfo.ISOMSG_OUT, tags); - msgOutCounter.increment(); + + if (channel instanceof ISOMsgMetrics.Source ms) { + ISOMsgMetrics mtr = ms.getISOMsgMetrics(); + if (mtr != null) { + mtr.addTags(tags); + mtr.register(registry); + } } } + private void removeMeters() { var registry = getServer().getMeterRegistry(); registry.remove(connectionsGauge); - if (msgInCounter != null) - registry.remove(msgInCounter); - if (msgOutCounter != null) - registry.remove(msgOutCounter); + + if (channel instanceof ISOMsgMetrics.Source ms) { + ISOMsgMetrics mtr = ms.getISOMsgMetrics(); + if (mtr != null) + mtr.removeMeters(); + } } } diff --git a/jpos/src/main/java/org/jpos/q2/iso/QServer.java b/jpos/src/main/java/org/jpos/q2/iso/QServer.java index e08eb2425a..359b2196cc 100644 --- a/jpos/src/main/java/org/jpos/q2/iso/QServer.java +++ b/jpos/src/main/java/org/jpos/q2/iso/QServer.java @@ -18,7 +18,6 @@ package org.jpos.q2.iso; -import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.binder.BaseUnits; @@ -28,6 +27,7 @@ import org.jpos.iso.*; import org.jpos.metrics.MeterFactory; import org.jpos.metrics.MeterInfo; +import org.jpos.metrics.iso.ISOMsgMetrics; import org.jpos.q2.QBeanSupport; import org.jpos.q2.QFactory; import org.jpos.space.LocalSpace; @@ -37,7 +37,6 @@ import org.jpos.util.LogSource; import org.jpos.util.NameRegistrar; -import java.util.Iterator; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -56,7 +55,7 @@ public class QServer private int port = 0; private int maxSessions = 100; private String channelString, packagerString, socketFactoryString; - private ISOChannel channel = null; + private ISOChannel channel = null; // is never connected; but passed to ISOServer as a clonable "template" for new connections private ISOServer server; protected LocalSpace sp; private String inQueue; @@ -65,8 +64,6 @@ public class QServer AtomicInteger msgn = new AtomicInteger(); private Gauge connectionsGauge; - private Counter msgOutCounter; - private Counter msgInCounter; private static final String CHANNEL_NAME_REGEXP = " (?=\\d+ \\S+:\\S+)"; public QServer () { @@ -86,7 +83,7 @@ private void newChannel () throws ConfigurationException { throw new ConfigurationException ("channel element missing"); } - ChannelAdaptor adaptor = new ChannelAdaptor (); + ChannelAdaptor adaptor = new ChannelAdaptor (); // leverage adaptor's newChannel logic channel = adaptor.newChannel (e, getFactory ()); } @@ -379,29 +376,35 @@ public boolean process(ISOSource source, ISOMsg m) { } private void initMeters() { - var tags = Tags.of("name", getName(), "type", "server"); + var tags = Tags.of("name", getName(), + "type", "server"); var registry = getServer().getMeterRegistry(); + connectionsGauge = MeterFactory.gauge (registry, MeterInfo.ISOSERVER_CONNECTION_COUNT, - tags, + tags.and("port", ""+getPort()), BaseUnits.SESSIONS, server::getActiveConnections - ); + ); - if (channel instanceof BaseChannel baseChannel) { - baseChannel.setCounters(msgInCounter, msgOutCounter); - baseChannel.setMeterRegistry(registry); - baseChannel.setServerName(getName()); + if (channel instanceof ISOMsgMetrics.Source ms) { + ISOMsgMetrics mtr = ms.getISOMsgMetrics(); + if (mtr != null) { + mtr.addTags(tags); + mtr.register(registry); + } } } private void removeMeters() { var registry = getServer().getMeterRegistry(); registry.remove(connectionsGauge); - if (msgInCounter != null) - registry.remove(msgInCounter); - if (msgOutCounter != null) - registry.remove(msgOutCounter); + + if (channel instanceof ISOMsgMetrics.Source ms) { + ISOMsgMetrics mtr = ms.getISOMsgMetrics(); + if (mtr != null) + mtr.removeMeters(); + } } } diff --git a/jpos/src/test/java/org/jpos/iso/BaseChannelTest.java b/jpos/src/test/java/org/jpos/iso/BaseChannelTest.java index 8148cb3627..627aa2d90d 100644 --- a/jpos/src/test/java/org/jpos/iso/BaseChannelTest.java +++ b/jpos/src/test/java/org/jpos/iso/BaseChannelTest.java @@ -37,8 +37,6 @@ import java.util.Collection; import java.util.Properties; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.MeterRegistry; import org.jpos.bsh.BSHFilter; import org.jpos.core.Configuration; import org.jpos.core.SimpleConfiguration; @@ -69,8 +67,6 @@ import org.jpos.iso.packager.ISOBaseValidatingPackager; import org.jpos.iso.packager.PostPackager; import org.jpos.iso.packager.XMLPackager; -import org.jpos.metrics.MeterFactory; -import org.jpos.metrics.MeterInfo; import org.jpos.util.LogEvent; import org.jpos.util.Logger; import org.jpos.util.NameRegistrar; @@ -1175,52 +1171,4 @@ public void testStreamReceive() throws Throwable { byte[] result = aSCIIChannel.streamReceive(); assertEquals(0, result.length, "result.length"); } - - @Test - public void testIncrementMsgInCounter() throws Exception { - BaseChannel channel = new XMLChannel(new XMLPackager()); - channel.setServerName("test-server"); - channel.setMeterRegistry(mock(MeterRegistry.class)); - - ISOMsg msg = mock(ISOMsg.class); - when(msg.hasMTI()).thenReturn(true); - when(msg.getMTI()).thenReturn("0200"); - - Counter counter = mock(Counter.class); - try (var mocked = mockStatic(MeterFactory.class)) { - mocked.when(() -> MeterFactory.updateCounter(any(), any(), any())) - .thenReturn(counter); - - channel.incrementMsgInCounter(msg); - - mocked.verify(() -> MeterFactory.updateCounter( - eq(channel.getMeterRegistry()), eq(MeterInfo.ISOMSG_IN), any() - )); - verify(counter).increment(); - } - } - - @Test - public void testIncrementMsgOutCounter() throws Exception { - BaseChannel channel = new XMLChannel(new XMLPackager()); - channel.setServerName("test-server"); - channel.setMeterRegistry(mock(MeterRegistry.class)); - - ISOMsg msg = mock(ISOMsg.class); - when(msg.hasMTI()).thenReturn(true); - when(msg.getMTI()).thenReturn("0210"); - - Counter counter = mock(Counter.class); - try (var mocked = mockStatic(MeterFactory.class)) { - mocked.when(() -> MeterFactory.updateCounter(any(), any(), any())) - .thenReturn(counter); - - channel.incrementMsgOutCounter(msg); - - mocked.verify(() -> MeterFactory.updateCounter( - eq(channel.getMeterRegistry()), eq(MeterInfo.ISOMSG_OUT), any() - )); - verify(counter).increment(); - } - } } diff --git a/jpos/src/test/java/org/jpos/metrics/iso/ISOMsgCounterTest.java b/jpos/src/test/java/org/jpos/metrics/iso/ISOMsgCounterTest.java new file mode 100644 index 0000000000..784b07da7d --- /dev/null +++ b/jpos/src/test/java/org/jpos/metrics/iso/ISOMsgCounterTest.java @@ -0,0 +1,220 @@ +package org.jpos.metrics.iso; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import org.jdom2.Document; +import org.jdom2.Element; +import org.jdom2.input.SAXBuilder; +import org.jpos.core.Configuration; +import org.jpos.core.ConfigurationException; +import org.jpos.core.SimpleConfiguration; +import org.jpos.iso.BaseChannel; +import org.jpos.iso.ISOException; +import org.jpos.iso.ISOMsg; +import org.jpos.q2.Q2; +import org.jpos.q2.iso.ChannelAdaptor; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.StringReader; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +public class ISOMsgCounterTest{ + + @BeforeEach + public void setupEnv() { + System.setProperty("metrics.channel.tags", + "type:unknown direction , , whatever:hello"); // global default tags size 3 + + System.setProperty("metrics.channel.fields", + " , itc, rc, geo:113.27"); // global fieldset size 3 + } + + @Test + public void testMetricSginature() throws ConfigurationException { + var mc= new ISOMsgCounter(); + mc.setMetricName("test.metric"); + assertEquals("test.metric|direction,geo,itc,rc,type,whatever", + mc.getMetricSignature()); + } + + @Test + public void testFieldSets() throws ConfigurationException, ISOException { + Configuration cfg = new SimpleConfiguration(); + cfg.put("fields", "rc:39, geo:scheme"); // overrides + + var mc= new ISOMsgCounter(); + mc.setConfiguration(cfg); + var fs = mc.getFieldSet(); + + assertEquals(3, fs.size()); + assertEquals("itc", fs.get("itc")); // not overridden, single alias + assertEquals("39", fs.get("rc")); // overridden, from rc alias to 39 + assertEquals("scheme", fs.get("geo")); // overridden from 113.27 to scheme + + var m = new ISOMsg(); + m.setMTI("0200"); + m.set(3, "20"); + assertEquals("0200.20", mc.resolveValExpr(m, "itc"), "itc alias not resolved correctly"); + } + + @Test + public void testUnknownFieldSets() throws ConfigurationException { + var mc= new ISOMsgCounter(); // constructor configures global defaults + + Configuration cfg = new SimpleConfiguration(); + cfg.put("fields", "rc:39, amount:4"); // amount is unknown in metrics.channel.fields + ConfigurationException ex = assertThrows(ConfigurationException.class, + () -> mc.setConfiguration(cfg)); + + assertTrue(ex.getMessage().matches(".*unknown.*'amount'.*"), + "Expected exception about unknown isofield tag 'amount', but got \""+ex.getMessage()+"\""); + } + + @Test + public void testExtraCustomTags() throws ConfigurationException { + var mc= new ISOMsgCounter(); + Map tm= mc.getTagsAsMap(); + + // first test the unchanged defaults + assertEquals(3, tm.size()); + assertEquals("unknown", tm.get("type")); + assertEquals("", tm.get("direction")); // direction given as single tag, so default value empty + assertEquals("hello", tm.get("whatever")); + assertEquals(null, tm.get("")); + + // now add overrides + Configuration cfg = new SimpleConfiguration(); + cfg.put("tags", + "type:server, whatever:goodbye,,, whatever:adios"); + // last occurrence of "whatever" should override the others (useless, but we don't throw error) + mc.setConfiguration(cfg); + tm= mc.getTagsAsMap(); // get updated copy of tags + + assertEquals(3, tm.size()); + assertEquals("server", tm.get("type")); // overridden + assertEquals("", tm.get("direction")); // unchanged + assertEquals("adios", tm.get("whatever")); // overridden, keeping last value + } + + + @Test + public void testUnknownTags() throws ConfigurationException { + var mc= new ISOMsgCounter(); // constructor configures global defaults + + Configuration cfg = new SimpleConfiguration(); + cfg.put("tags", "type:server, color:green"); // color is unknown in metrics.channel.tags + + ConfigurationException ex = assertThrows(ConfigurationException.class, + () -> mc.setConfiguration(cfg)); + + assertTrue(ex.getMessage().matches(".*unknown.*'color'.*"), + "Expected exception about unknown tag 'color', but got \""+ex.getMessage()+"\""); + } + + + @Test + public void testCustomMetricWithTags() throws ConfigurationException { + var mc= new ISOMsgCounter(); // constructor configures global defaults + mc.setMetricName("test.metric"); + + Configuration cfg = new SimpleConfiguration(); + cfg.put("tags", "planet:jupiter, color:green"); // custom tags are reset for custom metric name + mc.setConfiguration(cfg); + + Map tm= mc.getTagsAsMap(); + + assertEquals(2, tm.size()); + assertEquals("jupiter", tm.get("planet")); + assertEquals("green", tm.get("color")); + assertEquals(null, tm.get("whatever")); + + // default isofield tags added to our custom extra tags + assertEquals("test.metric|color,geo,itc,planet,rc", mc.getMetricSignature()); + } + + + @Test + public void testCustomMetricFromXML() throws Exception { + String chxml = """ + + + + + + + + + + """; + + SAXBuilder saxBuilder = new SAXBuilder(); + Document doc = saxBuilder.build(new StringReader(chxml)); + Element el= doc.getRootElement(); + + Q2 q2= new Q2(new String[0]); + try { + q2.start(); + boolean ready = q2.ready(10000); + assertTrue(ready, "Could not start Q2"); + + MeterRegistry registry = q2.getMeterRegistry(); + assertNotNull(registry, "Can't find Q2 MeterRegistry"); + + ChannelAdaptor adp= new ChannelAdaptor(); + BaseChannel ch = (BaseChannel)adp.newChannel(el, q2.getFactory()); + ISOMsgCounter isom = (ISOMsgCounter) ch.getISOMsgMetrics(); + isom.register(registry); + + var tagMap = isom.getTagsAsMap(); + + assertNotNull(isom, "Channel should have an ISOMsgMetrics but it was null"); + assertEquals("bbb_counter", isom.getMetricName()); + assertEquals("some_type", tagMap.get("type")); + assertEquals("5432", tagMap.get("port")); + assertEquals("", tagMap.get("empty")); + + var meters = isom.getMeters(); + assertEquals(0, meters.size()); + + var m = new ISOMsg(); + m.setMTI("0200"); + m.set(3, "20"); + isom.recordMessage(m); + + meters = isom.getMeters(); + assertEquals(1, meters.size()); + + var someMeterOpt = meters.stream().findAny(); + assertNotNull(someMeterOpt.get(), "No meters were registered after recording a message"); + + var someMeter = someMeterOpt.get(); + assertInstanceOf(Counter.class, someMeter, "The meter should be a Counter but it's a "+someMeter.getClass()); + Counter c1 = (Counter)someMeter; + assertEquals(1, c1.count(), "The count should be 1 after recording just one message"); + + Meter c2 = registry.find(someMeter.getId().getName()).meter(); + assertSame(c1, c2); + + isom.removeMeters(); + meters = isom.getMeters(); + assertEquals(0, meters.size(), "Meter not removed from ISOMsgCounter"); + + c2 = registry.find(someMeter.getId().getName()).meter(); + assertNull(c2, "Meter not removed from ISOMsgCounter"); + + + System.out.println(isom.getTags()); + } finally { + q2.shutdown(); + } + + + } + + +}