Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,18 @@ public enum CassandraRelevantProperties
CUSTOM_HINTS_HANDLER("cassandra.custom_hints_handler"),
CUSTOM_HINTS_RATE_LIMITER_FACTORY("cassandra.custom_hints_rate_limiter_factory"),
CUSTOM_INDEX_BUILD_DECIDER("cassandra.custom_index_build_decider"),

/**
* Which class to use for internode metrics for inbound connections.
* The provided class name must point to an implementation of {@link org.apache.cassandra.metrics.InternodeInboundMetrics}.
*/
CUSTOM_INTERNODE_INBOUND_METRICS_PROVIDER_PROPERTY("cassandra.custom_internode_inbound_metrics_provider_class"),
/**
* Which class to use for internode metrics for {@link org.apache.cassandra.net.OutboundConnections}.
* Which class to use for internode metrics for {@link org.apache.cassandra.net.InboundConnections}.
* The provided class name must point to an implementation of {@link org.apache.cassandra.metrics.InternodeOutboundMetrics}.
*/
CUSTOM_INTERNODE_OUTBOUND_METRICS_PROVIDER_PROPERTY("cassandra.custom_internode_outbound_metrics_provider_class"),

CUSTOM_KEYSPACES_FILTER_PROVIDER("cassandra.custom_keyspaces_filter_provider_class"),
/**
* Which class to use for messaging metrics for {@link org.apache.cassandra.net.MessagingService}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
*/
package org.apache.cassandra.metrics;

import java.lang.reflect.InvocationTargetException;

import com.codahale.metrics.Gauge;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.InboundMessageHandlers;
import org.apache.cassandra.metrics.CassandraMetricsRegistry.MetricName;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_INTERNODE_INBOUND_METRICS_PROVIDER_PROPERTY;

/**
* Metrics for internode connections.
Expand All @@ -42,12 +46,41 @@ public class InternodeInboundMetrics
private final MetricName throttledCount;
private final MetricName throttledNanos;

/**
* Factory method to create metrics for given inbound message handlers.
* This ensures the metrics provider is used when configured.
*
* @param peer IP address and port to use for metrics label
* @param handlers the inbound message handlers
* @return new InternodeInboundMetrics instance
*/
public static InternodeInboundMetrics create(InetAddressAndPort peer, InboundMessageHandlers handlers)
{
if (CUSTOM_INTERNODE_INBOUND_METRICS_PROVIDER_PROPERTY.isPresent())
{
Class<InternodeInboundMetrics> klass = FBUtilities.classForName(CUSTOM_INTERNODE_INBOUND_METRICS_PROVIDER_PROPERTY.getString(), "Internode Inbound Metrics Provider");
try
{
return klass.getDeclaredConstructor(InetAddressAndPort.class, InboundMessageHandlers.class).newInstance(peer, handlers);
}
catch (NoSuchMethodException | InstantiationException | IllegalAccessException |
InvocationTargetException e)
{
throw new RuntimeException(e);
}
}
else
{
return new InternodeInboundMetrics(peer, handlers);
}
}

/**
* Create metrics for given inbound message handlers.
*
* @param peer IP address and port to use for metrics label
*/
public InternodeInboundMetrics(InetAddressAndPort peer, InboundMessageHandlers handlers)
protected InternodeInboundMetrics(InetAddressAndPort peer, InboundMessageHandlers handlers)
{
// ipv6 addresses will contain colons, which are invalid in a JMX ObjectName
MetricNameFactory factory = new DefaultNameFactory("InboundConnection", peer.getHostAddressAndPortForJMX());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public InboundMessageHandlers(InetAddressAndPort self,
largeCallbacks = makeMessageCallbacks(peer, largeCounters, globalMetricCallbacks, messageConsumer);
legacyCallbacks = makeMessageCallbacks(peer, legacyCounters, globalMetricCallbacks, messageConsumer);

metrics = new InternodeInboundMetrics(peer, this);
metrics = InternodeInboundMetrics.create(peer, this);
}

InboundMessageHandler createHandler(FrameDecoder frameDecoder, ConnectionType type, Channel channel, int version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void setupAAVersion()
@After
public void restoreVersionSettings()
{
SAIUtil.setCurrentVersion(Version.CURRENT);
SAIUtil.resetCurrentVersion();
if (defaultImmutableMinVersionSetting != null)
IMMUTABLE_SAI_COMPONENTS_MIN_VERSION.setString(defaultImmutableMinVersionSetting);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.metrics;

import org.junit.After;
import org.junit.Test;

import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.InboundMessageHandlers;

import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_INTERNODE_INBOUND_METRICS_PROVIDER_PROPERTY;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* Test for custom internode inbound metrics provider functionality.
*
*/
public class CustomInternodeInboundMetricsTest
{
@After
public void teardown()
{
System.clearProperty(CUSTOM_INTERNODE_INBOUND_METRICS_PROVIDER_PROPERTY.getKey());
}

@Test
public void testInvalidClassNameThrowsException() throws Exception
{
CUSTOM_INTERNODE_INBOUND_METRICS_PROVIDER_PROPERTY.setString("org.apache.cassandra.metrics.NonExistentClass");

InetAddressAndPort peer = InetAddressAndPort.getByName("127.0.0.1");
InboundMessageHandlers handlers = null;

assertThatThrownBy(() -> InternodeInboundMetrics.create(peer, handlers))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("NonExistentClass");
}

@Test
public void testCustomProviderWithoutProperConstructorThrowsException() throws Exception
{
// When custom provider doesn't have required constructor (InetAddressAndPort, InboundMessageHandlers),
// should throw RuntimeException with NoSuchMethodException as cause
CUSTOM_INTERNODE_INBOUND_METRICS_PROVIDER_PROPERTY.setString(InvalidCustomMetrics.class.getName());

InetAddressAndPort peer = InetAddressAndPort.getByName("127.0.0.1");
InboundMessageHandlers handlers = null;

assertThatThrownBy(() -> InternodeInboundMetrics.create(peer, handlers))
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(NoSuchMethodException.class);
}

@Test
public void testNonPublicConstructorThrowsException() throws Exception
{
// When custom provider has private constructor, should throw RuntimeException with IllegalAccessException
CUSTOM_INTERNODE_INBOUND_METRICS_PROVIDER_PROPERTY.setString(PrivateConstructorMetrics.class.getName());

InetAddressAndPort peer = InetAddressAndPort.getByName("127.0.0.1");
InboundMessageHandlers handlers = null;

assertThatThrownBy(() -> InternodeInboundMetrics.create(peer, handlers))
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(IllegalAccessException.class);
}

@Test
public void testAbstractClassThrowsException() throws Exception
{
// When custom provider is abstract, should throw RuntimeException with InstantiationException
CUSTOM_INTERNODE_INBOUND_METRICS_PROVIDER_PROPERTY.setString(AbstractCustomMetrics.class.getName());

InetAddressAndPort peer = InetAddressAndPort.getByName("127.0.0.1");
InboundMessageHandlers handlers = null;

assertThatThrownBy(() -> InternodeInboundMetrics.create(peer, handlers))
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(InstantiationException.class);
}

/**
* Invalid implementation missing required constructor signature
*/
public static class InvalidCustomMetrics extends InternodeInboundMetrics
{
// Missing required constructor (InetAddressAndPort, InboundMessageHandlers)
public InvalidCustomMetrics(String wrongParam)
{
super(null, null);
}
}

/**
* Invalid implementation with private constructor
*/
public static class PrivateConstructorMetrics extends InternodeInboundMetrics
{
private PrivateConstructorMetrics(InetAddressAndPort peer, InboundMessageHandlers handlers)
{
super(peer, handlers);
}
}

/**
* Invalid abstract implementation
*/
public static abstract class AbstractCustomMetrics extends InternodeInboundMetrics
{
public AbstractCustomMetrics(InetAddressAndPort peer, InboundMessageHandlers handlers)
{
super(peer, handlers);
}
}
}