From 9d194b2bd049cb75516665ae476c3cfe0e217b01 Mon Sep 17 00:00:00 2001 From: Amichai Rothman Date: Fri, 27 Jun 2025 18:05:03 +0300 Subject: [PATCH 1/5] Change TcpInvocationHandler to invoke Object methods locally --- .../aries/rsa/provider/tcp/TcpInvocationHandler.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java index 80fe6477..460bf96a 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java @@ -64,6 +64,16 @@ public TcpInvocationHandler(ClassLoader cl, String host, int port, String endpoi @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + // handle Object methods locally so we can use equals, HashMap, etc. normally + if (method.getDeclaringClass() == Object.class) { + switch (method.getName()) { + case "equals": return proxy == args[0]; + case "hashCode": return System.identityHashCode(proxy); + case "toString": return proxy.getClass().getName() + "@" + + Integer.toHexString(System.identityHashCode(proxy)); + } + } + // handle remote invocation if (Future.class.isAssignableFrom(method.getReturnType()) || CompletionStage.class.isAssignableFrom(method.getReturnType())) { return createFutureResult(method, args); From 55bd34520789ca42c4c69e7e844506d800d98ce3 Mon Sep 17 00:00:00 2001 From: Amichai Rothman Date: Fri, 27 Jun 2025 18:20:44 +0300 Subject: [PATCH 2/5] ARIES-2120 Change DistributionProvider to return a Closeable ImportedService so that resources can be properly released --- .../itests/felix/tcp/TestDiscoveryExport.java | 8 ++-- .../rsa/provider/fastbin/FastBinProvider.java | 12 +++-- .../aries/rsa/provider/tcp/TcpProvider.java | 12 +++-- .../tcp/TcpProviderPrimitiveTest.java | 12 +++-- .../rsa/provider/tcp/TcpProviderTest.java | 27 +++++++---- .../aries/rsa/core/ClientServiceFactory.java | 26 +++++++--- .../rsa/core/ClientServiceFactoryTest.java | 2 +- .../rsa/core/RemoteServiceAdminCoreTest.java | 3 +- .../aries/rsa/spi/DistributionProvider.java | 17 ++++--- .../apache/aries/rsa/spi/ImportedService.java | 48 +++++++++++++++++++ .../apache/aries/rsa/spi/package-info.java | 2 +- 11 files changed, 126 insertions(+), 43 deletions(-) create mode 100644 spi/src/main/java/org/apache/aries/rsa/spi/ImportedService.java diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java index b7bd4cc7..1080bbab 100644 --- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java +++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java @@ -29,6 +29,7 @@ import org.apache.aries.rsa.itests.felix.RsaTestBase; import org.apache.aries.rsa.spi.DistributionProvider; import org.apache.aries.rsa.spi.EndpointDescriptionParser; +import org.apache.aries.rsa.spi.ImportedService; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.hamcrest.Matchers; @@ -67,10 +68,11 @@ public static Option[] configure() throws Exception { @Test public void testDiscoveryExport() throws Exception { EndpointDescription epd = getEndpoint(); - EchoService service = (EchoService)tcpProvider - .importEndpoint(EchoService.class.getClassLoader(), - bundleContext, new Class[]{EchoService.class}, epd); + ImportedService importedService = tcpProvider.importEndpoint(EchoService.class.getClassLoader(), + bundleContext, new Class[]{EchoService.class}, epd); + EchoService service = (EchoService)importedService.getService(); Assert.assertEquals("test", service.echo("test")); + importedService.close(); } private EndpointDescription getEndpoint() throws Exception { diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java index 5148a95e..997dcf42 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java @@ -35,6 +35,7 @@ import org.apache.aries.rsa.provider.fastbin.util.UuidGenerator; import org.apache.aries.rsa.spi.DistributionProvider; import org.apache.aries.rsa.spi.Endpoint; +import org.apache.aries.rsa.spi.ImportedService; import org.apache.aries.rsa.spi.IntentUnsatisfiedException; import org.fusesource.hawtdispatch.Dispatch; import org.fusesource.hawtdispatch.DispatchQueue; @@ -172,15 +173,16 @@ public void close() throws IOException { } @Override - public Object importEndpoint(ClassLoader cl, - BundleContext consumerContext, - Class[] interfaces, - EndpointDescription endpoint) + public ImportedService importEndpoint(ClassLoader cl, + BundleContext consumerContext, + Class[] interfaces, + EndpointDescription endpoint) throws IntentUnsatisfiedException { String address = (String) endpoint.getProperties().get(FASTBIN_ADDRESS); InvocationHandler handler = client.getProxy(address, endpoint.getId(), cl); - return Proxy.newProxyInstance(cl, interfaces, handler); + Object service = Proxy.newProxyInstance(cl, interfaces, handler); + return () -> service; } } diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java index 29e404ff..b836d176 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java @@ -32,6 +32,7 @@ import org.apache.aries.rsa.annotations.RSADistributionProvider; import org.apache.aries.rsa.spi.DistributionProvider; import org.apache.aries.rsa.spi.Endpoint; +import org.apache.aries.rsa.spi.ImportedService; import org.apache.aries.rsa.spi.IntentUnsatisfiedException; import org.apache.aries.rsa.util.StringPlus; import org.osgi.framework.BundleContext; @@ -126,17 +127,18 @@ private synchronized void removeServer(TcpEndpoint endpoint) { } @Override - public Object importEndpoint(ClassLoader cl, - BundleContext consumerContext, - Class[] interfaces, - EndpointDescription endpoint) + public ImportedService importEndpoint(ClassLoader cl, + BundleContext consumerContext, + Class[] interfaces, + EndpointDescription endpoint) throws IntentUnsatisfiedException { try { String endpointId = endpoint.getId(); URI address = new URI(endpointId); int timeout = new EndpointPropertiesParser(endpoint).getTimeoutMillis(); InvocationHandler handler = new TcpInvocationHandler(cl, address.getHost(), address.getPort(), endpointId, timeout); - return Proxy.newProxyInstance(cl, interfaces, handler); + Object service = Proxy.newProxyInstance(cl, interfaces, handler); + return () -> service; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java index 5ddd77d4..4591a286 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java @@ -38,6 +38,7 @@ import org.apache.aries.rsa.provider.tcp.myservice.PrimitiveService; import org.apache.aries.rsa.provider.tcp.myservice.PrimitiveServiceImpl; import org.apache.aries.rsa.spi.Endpoint; +import org.apache.aries.rsa.spi.ImportedService; import org.apache.aries.rsa.util.EndpointHelper; import org.easymock.EasyMock; import org.junit.AfterClass; @@ -52,6 +53,7 @@ public class TcpProviderPrimitiveTest { private static PrimitiveService myServiceProxy; private static Endpoint ep; + private static ImportedService importedService; @BeforeClass public static void createServerAndProxy() { @@ -66,10 +68,11 @@ public static void createServerAndProxy() { ep = provider.exportService(myService, bc, props, exportedInterfaces); assertThat(ep.description().getId(), startsWith("tcp://localhost:")); System.out.println(ep.description()); - myServiceProxy = (PrimitiveService)provider.importEndpoint(PrimitiveService.class.getClassLoader(), - bc, - exportedInterfaces, - ep.description()); + importedService = provider.importEndpoint(PrimitiveService.class.getClassLoader(), + bc, + exportedInterfaces, + ep.description()); + myServiceProxy = (PrimitiveService)importedService.getService(); } @Test @@ -164,6 +167,7 @@ public void testDTOAr() { @AfterClass public static void close() throws IOException { + importedService.close(); ep.close(); } diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java index 6c39613b..9f0f376d 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java @@ -48,6 +48,7 @@ import org.apache.aries.rsa.provider.tcp.myservice.MyService; import org.apache.aries.rsa.provider.tcp.myservice.MyServiceImpl; import org.apache.aries.rsa.spi.Endpoint; +import org.apache.aries.rsa.spi.ImportedService; import org.apache.aries.rsa.util.EndpointHelper; import org.easymock.EasyMock; import org.junit.After; @@ -70,6 +71,8 @@ public class TcpProviderTest { private MyService myServiceProxy2; private Endpoint ep; private Endpoint ep2; + private ImportedService importedService; + private ImportedService importedService2; protected static int getFreePort() throws IOException { try (ServerSocket socket = new ServerSocket()) { @@ -96,21 +99,25 @@ public void createServerAndProxy() throws IOException { props.put("aries.rsa.id", "service2"); ep2 = provider.exportService(new MyServiceImpl("service2"), bc, props, exportedInterfaces); assertThat(ep.description().getId(), startsWith("tcp://localhost:")); - myServiceProxy = (MyService)provider.importEndpoint( - MyService.class.getClassLoader(), - bc, - exportedInterfaces, - ep.description()); - myServiceProxy2 = (MyService)provider.importEndpoint( - MyService.class.getClassLoader(), - bc, - exportedInterfaces, - ep2.description()); + importedService = provider.importEndpoint( + MyService.class.getClassLoader(), + bc, + exportedInterfaces, + ep.description()); + myServiceProxy = (MyService)importedService.getService(); + importedService2 = provider.importEndpoint( + MyService.class.getClassLoader(), + bc, + exportedInterfaces, + ep2.description()); + myServiceProxy2 = (MyService)importedService2.getService(); } @After public void close() throws IOException { + importedService.close(); + importedService2.close(); ep.close(); ep2.close(); } diff --git a/rsa/src/main/java/org/apache/aries/rsa/core/ClientServiceFactory.java b/rsa/src/main/java/org/apache/aries/rsa/core/ClientServiceFactory.java index 4a733418..869a1dfc 100644 --- a/rsa/src/main/java/org/apache/aries/rsa/core/ClientServiceFactory.java +++ b/rsa/src/main/java/org/apache/aries/rsa/core/ClientServiceFactory.java @@ -18,12 +18,16 @@ */ package org.apache.aries.rsa.core; +import java.io.IOException; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.aries.rsa.spi.DistributionProvider; +import org.apache.aries.rsa.spi.ImportedService; import org.apache.aries.rsa.spi.IntentUnsatisfiedException; import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; @@ -44,7 +48,7 @@ public class ClientServiceFactory implements ServiceFactory { private ImportRegistrationImpl importRegistration; private boolean closeable; - private int serviceCounter; + private Map services = new HashMap<>(); public ClientServiceFactory(EndpointDescription endpoint, DistributionProvider handler, ImportRegistrationImpl ir) { @@ -63,15 +67,16 @@ public Object getService(final Bundle requestingBundle, final ServiceRegistratio for (String ifaceName : interfaceNames) { interfaces.add(consumerLoader.loadClass(ifaceName)); } - Object proxy = AccessController.doPrivileged(new PrivilegedAction() { - public Object run() { + ImportedService importedService = AccessController.doPrivileged(new PrivilegedAction() { + public ImportedService run() { Class[] ifAr = interfaces.toArray(new Class[]{}); return handler.importEndpoint(consumerLoader, consumerContext, ifAr, endpoint); } }); + Object proxy = importedService.getService(); synchronized (this) { - serviceCounter++; + services.put(proxy, importedService); } return proxy; } catch (IntentUnsatisfiedException iue) { @@ -85,8 +90,15 @@ public Object run() { public void ungetService(Bundle requestingBundle, ServiceRegistration sreg, Object serviceObject) { synchronized (this) { - serviceCounter--; - LOG.debug("Services still provided by this ServiceFactory: {}", serviceCounter); + ImportedService importedService = services.remove(serviceObject); + if (importedService != null) { + try { + importedService.close(); + } catch (IOException e) { + LOG.warn("Problem closing imported service proxy {} for {}", serviceObject, requestingBundle, e); + } + } + LOG.debug("Services still provided by this ServiceFactory: {}", services.size()); closeIfUnused(); } } @@ -99,7 +111,7 @@ public void setCloseable(boolean closeable) { } private synchronized void closeIfUnused() { - if (serviceCounter <= 0 && closeable) { + if (services.isEmpty() && closeable) { importRegistration.closeAll(); } } diff --git a/rsa/src/test/java/org/apache/aries/rsa/core/ClientServiceFactoryTest.java b/rsa/src/test/java/org/apache/aries/rsa/core/ClientServiceFactoryTest.java index 2f45d13f..51936261 100644 --- a/rsa/src/test/java/org/apache/aries/rsa/core/ClientServiceFactoryTest.java +++ b/rsa/src/test/java/org/apache/aries/rsa/core/ClientServiceFactoryTest.java @@ -73,7 +73,7 @@ private DistributionProvider mockDistributionProvider(final Object proxy) { EasyMock.expect(handler.importEndpoint(anyObject(ClassLoader.class), anyObject(BundleContext.class), isA(Class[].class), - anyObject(EndpointDescription.class))).andReturn(proxy); + anyObject(EndpointDescription.class))).andReturn(() -> proxy); EasyMock.replay(handler); return handler; } diff --git a/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java b/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java index 22aca0a8..bbccfb27 100644 --- a/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java +++ b/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java @@ -43,6 +43,7 @@ import org.apache.aries.rsa.core.event.EventProducer; import org.apache.aries.rsa.spi.DistributionProvider; import org.apache.aries.rsa.spi.Endpoint; +import org.apache.aries.rsa.spi.ImportedService; import org.easymock.EasyMock; import org.easymock.IAnswer; import org.easymock.IMocksControl; @@ -429,7 +430,7 @@ public Endpoint exportService(Object serviceO, BundleContext serviceContext, } @Override - public Object importEndpoint(ClassLoader cl, BundleContext consumerContext, Class[] interfaces, + public ImportedService importEndpoint(ClassLoader cl, BundleContext consumerContext, Class[] interfaces, EndpointDescription endpoint) { return null; } diff --git a/spi/src/main/java/org/apache/aries/rsa/spi/DistributionProvider.java b/spi/src/main/java/org/apache/aries/rsa/spi/DistributionProvider.java index 4efd15ae..b23e9148 100644 --- a/spi/src/main/java/org/apache/aries/rsa/spi/DistributionProvider.java +++ b/spi/src/main/java/org/apache/aries/rsa/spi/DistributionProvider.java @@ -30,7 +30,7 @@ public interface DistributionProvider { /** * Called by RemoteServiceAdmin to export a service. - * + *

* The Distribution provider will be called if no config type was set or * if it supports the config type. * @@ -46,14 +46,19 @@ Endpoint exportService(Object serviceO, Class[] exportedInterfaces); /** + * Called by RemoteServiceAdmin to import a service, + * i.e. get a proxy that can be used to access the remote service. + *

* @param cl classloader of the consumer bundle * @param consumerContext bundle context of the consumer bundle * @param interfaces interfaces of the service to proxy * @param endpoint description of the remote endpoint - * @return service proxy to be given to the requesting bundle + * @return an ImportedService that provides the service proxy + * to be given to the requesting bundle, and can be closed + * when the service is no longer used */ - Object importEndpoint(ClassLoader cl, - BundleContext consumerContext, - Class[] interfaces, - EndpointDescription endpoint); + ImportedService importEndpoint(ClassLoader cl, + BundleContext consumerContext, + Class[] interfaces, + EndpointDescription endpoint); } diff --git a/spi/src/main/java/org/apache/aries/rsa/spi/ImportedService.java b/spi/src/main/java/org/apache/aries/rsa/spi/ImportedService.java new file mode 100644 index 00000000..b733a881 --- /dev/null +++ b/spi/src/main/java/org/apache/aries/rsa/spi/ImportedService.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.spi; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Wraps an imported service proxy, while allowing it to be properly + * closed to release resources, close connections, etc. + */ +public interface ImportedService extends Closeable { + + /** + * Returns the service proxy to be used by consumer bundles. + * + * @return the service proxy + */ + Object getService(); + + /** + * Close the service and release its resources. + *

+ * Implementations should override this to release resources + * associated with the service, close connections, etc. + * when the service is no longer used. + * + * @throws IOException if an error occurs + */ + default void close() throws IOException { + } +} diff --git a/spi/src/main/java/org/apache/aries/rsa/spi/package-info.java b/spi/src/main/java/org/apache/aries/rsa/spi/package-info.java index b622a797..de124fcd 100644 --- a/spi/src/main/java/org/apache/aries/rsa/spi/package-info.java +++ b/spi/src/main/java/org/apache/aries/rsa/spi/package-info.java @@ -18,5 +18,5 @@ */ @org.osgi.annotation.bundle.Export -@org.osgi.annotation.versioning.Version("1.1.0") +@org.osgi.annotation.versioning.Version("2.0.0") package org.apache.aries.rsa.spi; \ No newline at end of file From 47105d614887e478e79e10d46791307cd50c3a31 Mon Sep 17 00:00:00 2001 From: Amichai Rothman Date: Wed, 2 Jul 2025 10:09:54 +0300 Subject: [PATCH 3/5] Improve TcpProvider performance test --- .../apache/aries/rsa/provider/tcp/TcpProviderTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java index 9f0f376d..64867636 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java @@ -64,7 +64,7 @@ public class TcpProviderTest { private static final int TIMEOUT = 200; - private static final int NUM_CALLS = 100; + private static final int NUM_CALLS = 2000; // increase this manually to find the max throughput private static final int NUM_THREADS = 10; private MyService myServiceProxy; @@ -268,8 +268,10 @@ public void run() { } executor.shutdown(); executor.awaitTermination(100, TimeUnit.SECONDS); - long tps = NUM_CALLS * 1000 / (System.currentTimeMillis() - start); - System.out.println(tps + " tps"); + long cps = NUM_CALLS * 1000 / (System.currentTimeMillis() - start); + long cpst = cps / NUM_THREADS; + System.out.println(cpst + " calls per second on each thread"); + System.out.println(cps + " calls per second total on " + NUM_THREADS + " threads"); } } From 48531b5c0885b89dc0bc7ba5f8e7f5ccb6173c18 Mon Sep 17 00:00:00 2001 From: Amichai Rothman Date: Mon, 30 Jun 2025 19:14:18 +0300 Subject: [PATCH 4/5] ARIES 2121 Add TCP provider connection persistence with connection pool (~100x throughput) --- .../provider/tcp/TcpInvocationHandler.java | 131 +++++++++++++++--- .../aries/rsa/provider/tcp/TcpProvider.java | 15 +- .../aries/rsa/provider/tcp/TcpServer.java | 24 ++-- 3 files changed, 137 insertions(+), 33 deletions(-) diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java index 460bf96a..0c7179b2 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java @@ -18,16 +18,18 @@ */ package org.apache.aries.rsa.provider.tcp; +import java.io.Closeable; import java.io.IOException; -import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.Socket; -import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Iterator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Future; @@ -46,15 +48,32 @@ * which sends the details of the method invocations * over a TCP connection, to be executed by the remote service. */ -public class TcpInvocationHandler implements InvocationHandler { +public class TcpInvocationHandler implements InvocationHandler, Closeable { + + private static class Connection { + Socket socket; + BasicObjectOutputStream out; + BasicObjectInputStream in; + + public Connection(Socket socket) throws IOException { + this.socket = socket; + out = new BasicObjectOutputStream(socket.getOutputStream()); + in = new BasicObjectInputStream(socket.getInputStream()); + } + } + private String host; private int port; private String endpointId; private ClassLoader cl; private int timeoutMillis; + private final Deque pool = new ArrayDeque<>(); + private int acquired; // counts connections currently in use (not in pool) + private boolean closed; + public TcpInvocationHandler(ClassLoader cl, String host, int port, String endpointId, int timeoutMillis) - throws UnknownHostException, IOException { + throws UnknownHostException, IOException { this.cl = cl; this.host = host; this.port = port; @@ -62,6 +81,66 @@ public TcpInvocationHandler(ClassLoader cl, String host, int port, String endpoi this.timeoutMillis = timeoutMillis; } + private Connection acquireConnection() throws IOException { + Connection conn; + synchronized (pool) { + acquired++; // must be first + if (closed) { + throw new IOException("Connection pool is closed"); + } + conn = pool.pollFirst(); // reuse most recently used connection + } + // if the pool is empty, create a new connection + if (conn == null) { + conn = new Connection(openSocket()); + conn.socket.setSoTimeout(timeoutMillis); + conn.socket.setTcpNoDelay(true); + conn.in.addClassLoader(cl); + conn.out.writeUTF(endpointId); // select endpoint for this connection + } + return conn; + } + + // must be called exactly once for each call to acquireConnection, + // regardless of the outcome - if there was an error, pass null + private void releaseConnection(Connection conn) { + synchronized (pool) { + acquired--; // must be first + if (conn != null) { + pool.offerFirst(conn); // add to front of queue so old idle ones can expire + } + pool.notifyAll(); + } + } + + private void closeConnection(Connection conn) throws IOException { + if (conn != null) { + conn.socket.close(); + } + } + + private void closeConnections() throws IOException { + synchronized (pool) { + closed = true; // first prevent acquiring new connections + while (true) { + // close all idle connections + for (Iterator it = pool.iterator(); it.hasNext(); ) { + closeConnection(it.next()); + it.remove(); + } + if (acquired == 0) { + break; // all closed + } + // wait for additional active connections to be released + try { + pool.wait(); + } catch (InterruptedException ie) { + throw new IOException("interrupted while closing connections", ie); + } + } + } + } + @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // handle Object methods locally so we can use equals, HashMap, etc. normally @@ -115,33 +194,37 @@ public void run() { } private Object handleSyncCall(Method method, Object[] args) throws Throwable { + Connection conn = null; Throwable error; Object result; - try ( - Socket socket = openSocket(); - ObjectOutputStream out = new BasicObjectOutputStream(socket.getOutputStream()) - ) { - socket.setSoTimeout(timeoutMillis); - out.writeUTF(endpointId); - out.writeObject(method.getName()); - out.writeObject(args); - out.flush(); - - try (BasicObjectInputStream in = new BasicObjectInputStream(socket.getInputStream())) { - in.addClassLoader(cl); - error = (Throwable) in.readObject(); - result = readReplaceVersion(in.readObject()); - } + + try { + conn = acquireConnection(); + + // write invocation data + conn.out.writeObject(method.getName()); + conn.out.writeObject(args); + conn.out.flush(); + conn.out.reset(); + // read result data + error = (Throwable)conn.in.readObject(); + result = readReplaceVersion(conn.in.readObject()); + if (error == null) return result; else if (error instanceof InvocationTargetException) error = error.getCause(); // exception thrown from remotely invoked method (not our problem) else throw error; // exception thrown by provider itself - } catch (SocketTimeoutException e) { - throw new ServiceException("Timeout calling " + host + ":" + port + " method: " + method.getName(), ServiceException.REMOTE, e); } catch (Throwable e) { - throw new ServiceException("Error calling " + host + ":" + port + " method: " + method.getName(), ServiceException.REMOTE, e); + // this can be an unexpected error from remote (not from the invoked method itself + // but somewhere in the provider processing), or a communications error (e.g. timeout) - + // in either case we don't know what was written or not, so we must abort the connection + closeConnection(conn); + conn = null; // don't return it to the pool + throw new ServiceException("Error invoking " + method.getName() + " on " + endpointId, ServiceException.REMOTE, e); + } finally { + releaseConnection(conn); } throw error; } @@ -168,4 +251,8 @@ private Object readReplaceVersion(Object readObject) { } } + @Override + public void close() throws IOException { + closeConnections(); + } } diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java index b836d176..f1685936 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java @@ -19,7 +19,6 @@ package org.apache.aries.rsa.provider.tcp; import java.io.IOException; -import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; import java.net.URI; import java.util.Arrays; @@ -136,9 +135,19 @@ public ImportedService importEndpoint(ClassLoader cl, String endpointId = endpoint.getId(); URI address = new URI(endpointId); int timeout = new EndpointPropertiesParser(endpoint).getTimeoutMillis(); - InvocationHandler handler = new TcpInvocationHandler(cl, address.getHost(), address.getPort(), endpointId, timeout); + TcpInvocationHandler handler = new TcpInvocationHandler(cl, address.getHost(), address.getPort(), endpointId, timeout); Object service = Proxy.newProxyInstance(cl, interfaces, handler); - return () -> service; + return new ImportedService() { + @Override + public Object getService() { + return service; + } + + @Override + public void close() throws IOException { + handler.close(); + } + }; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpServer.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpServer.java index 4f01cbf9..f40ec1b5 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpServer.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpServer.java @@ -19,6 +19,7 @@ package org.apache.aries.rsa.provider.tcp; import java.io.Closeable; +import java.io.EOFException; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -27,6 +28,7 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; +import java.net.SocketTimeoutException; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -113,20 +115,24 @@ public void run() { } private void handleConnection(Socket socket) { - try (Socket sock = socket; - BasicObjectInputStream in = new BasicObjectInputStream(socket.getInputStream()); - ObjectOutputStream out = new BasicObjectOutputStream(socket.getOutputStream())) { + try (Socket sock = socket; // socket will be closed when done + ObjectOutputStream out = new BasicObjectOutputStream(socket.getOutputStream()); + BasicObjectInputStream in = new BasicObjectInputStream(socket.getInputStream())) { + socket.setTcpNoDelay(true); String endpointId = in.readUTF(); MethodInvoker invoker = invokers.get(endpointId); if (invoker == null) throw new IllegalArgumentException("invalid endpoint: " + endpointId); in.addClassLoader(invoker.getService().getClass().getClassLoader()); - handleCall(invoker, in, out); - } catch (SocketException se) { - return; // e.g. connection closed by client - } catch (Exception e) { - LOG.warn("Error processing service call", e); + while (running) { + handleCall(invoker, in, out); + } + } catch (SocketException | SocketTimeoutException | EOFException se) { + return; // e.g. connection closed by client or read timeout due to inactivity + } catch (Throwable t) { + LOG.warn("Error processing service call", t); } + // connection is now closed and thread is done } private void handleCall(MethodInvoker invoker, ObjectInputStream in, ObjectOutputStream out) throws Exception { @@ -141,6 +147,8 @@ private void handleCall(MethodInvoker invoker, ObjectInputStream in, ObjectOutpu } out.writeObject(error); out.writeObject(result); + out.flush(); + out.reset(); } @SuppressWarnings("unchecked") From 4a6a85dc6735e0089ce03d996e63a0eae50c1da9 Mon Sep 17 00:00:00 2001 From: Amichai Rothman Date: Tue, 1 Jul 2025 00:43:57 +0300 Subject: [PATCH 5/5] Add retries when connection is stale --- .../provider/tcp/TcpInvocationHandler.java | 44 ++++++++++++++----- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java index 0c7179b2..db918682 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java @@ -24,6 +24,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.Socket; +import java.net.SocketException; import java.net.UnknownHostException; import java.security.AccessController; import java.security.PrivilegedAction; @@ -141,6 +142,12 @@ private void closeConnections() throws IOException { } } + private int getPoolSize() { + synchronized (pool) { + return pool.size() + acquired; // both idle and active + } + } + @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // handle Object methods locally so we can use equals, HashMap, etc. normally @@ -195,20 +202,33 @@ public void run() { private Object handleSyncCall(Method method, Object[] args) throws Throwable { Connection conn = null; - Throwable error; - Object result; + Throwable error = null; + Object result = null; try { - conn = acquireConnection(); - - // write invocation data - conn.out.writeObject(method.getName()); - conn.out.writeObject(args); - conn.out.flush(); - conn.out.reset(); - // read result data - error = (Throwable)conn.in.readObject(); - result = readReplaceVersion(conn.in.readObject()); + // try at most all existing connections (which may be stale) plus one new + for (int attempts = getPoolSize() + 1; attempts > 0; attempts--) { + conn = acquireConnection(); // get or create pool connection + try { + // write invocation data + conn.out.writeObject(method.getName()); + conn.out.writeObject(args); + conn.out.flush(); + conn.out.reset(); + // read result data + error = (Throwable) conn.in.readObject(); + result = readReplaceVersion(conn.in.readObject()); + break; // transaction completed + } catch (SocketException se) { // catch only read/write exceptions here - only stale connections + if (attempts == 1) { + throw se; // failed last attempt - propagate the error + } + // the server socket was previously open, but now failed - + // communication error or server socket was closed (e.g. idle timeout) + // so we retry with another connection + releaseConnection(null); // dispose of it before next attempt + } + } if (error == null) return result;