diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java index b7bd4cc78..1080bbab4 100644 --- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java +++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java @@ -29,6 +29,7 @@ import org.apache.aries.rsa.itests.felix.RsaTestBase; import org.apache.aries.rsa.spi.DistributionProvider; import org.apache.aries.rsa.spi.EndpointDescriptionParser; +import org.apache.aries.rsa.spi.ImportedService; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.hamcrest.Matchers; @@ -67,10 +68,11 @@ public static Option[] configure() throws Exception { @Test public void testDiscoveryExport() throws Exception { EndpointDescription epd = getEndpoint(); - EchoService service = (EchoService)tcpProvider - .importEndpoint(EchoService.class.getClassLoader(), - bundleContext, new Class[]{EchoService.class}, epd); + ImportedService importedService = tcpProvider.importEndpoint(EchoService.class.getClassLoader(), + bundleContext, new Class[]{EchoService.class}, epd); + EchoService service = (EchoService)importedService.getService(); Assert.assertEquals("test", service.echo("test")); + importedService.close(); } private EndpointDescription getEndpoint() throws Exception { diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java index 5148a95ef..997dcf42b 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java @@ -35,6 +35,7 @@ import org.apache.aries.rsa.provider.fastbin.util.UuidGenerator; import org.apache.aries.rsa.spi.DistributionProvider; import org.apache.aries.rsa.spi.Endpoint; +import org.apache.aries.rsa.spi.ImportedService; import org.apache.aries.rsa.spi.IntentUnsatisfiedException; import org.fusesource.hawtdispatch.Dispatch; import org.fusesource.hawtdispatch.DispatchQueue; @@ -172,15 +173,16 @@ public void close() throws IOException { } @Override - public Object importEndpoint(ClassLoader cl, - BundleContext consumerContext, - Class[] interfaces, - EndpointDescription endpoint) + public ImportedService importEndpoint(ClassLoader cl, + BundleContext consumerContext, + Class[] interfaces, + EndpointDescription endpoint) throws IntentUnsatisfiedException { String address = (String) endpoint.getProperties().get(FASTBIN_ADDRESS); InvocationHandler handler = client.getProxy(address, endpoint.getId(), cl); - return Proxy.newProxyInstance(cl, interfaces, handler); + Object service = Proxy.newProxyInstance(cl, interfaces, handler); + return () -> service; } } diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java index 80fe64778..460bf96aa 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java @@ -64,6 +64,16 @@ public TcpInvocationHandler(ClassLoader cl, String host, int port, String endpoi @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + // handle Object methods locally so we can use equals, HashMap, etc. normally + if (method.getDeclaringClass() == Object.class) { + switch (method.getName()) { + case "equals": return proxy == args[0]; + case "hashCode": return System.identityHashCode(proxy); + case "toString": return proxy.getClass().getName() + "@" + + Integer.toHexString(System.identityHashCode(proxy)); + } + } + // handle remote invocation if (Future.class.isAssignableFrom(method.getReturnType()) || CompletionStage.class.isAssignableFrom(method.getReturnType())) { return createFutureResult(method, args); diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java index 29e404ff1..b836d1760 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpProvider.java @@ -32,6 +32,7 @@ import org.apache.aries.rsa.annotations.RSADistributionProvider; import org.apache.aries.rsa.spi.DistributionProvider; import org.apache.aries.rsa.spi.Endpoint; +import org.apache.aries.rsa.spi.ImportedService; import org.apache.aries.rsa.spi.IntentUnsatisfiedException; import org.apache.aries.rsa.util.StringPlus; import org.osgi.framework.BundleContext; @@ -126,17 +127,18 @@ private synchronized void removeServer(TcpEndpoint endpoint) { } @Override - public Object importEndpoint(ClassLoader cl, - BundleContext consumerContext, - Class[] interfaces, - EndpointDescription endpoint) + public ImportedService importEndpoint(ClassLoader cl, + BundleContext consumerContext, + Class[] interfaces, + EndpointDescription endpoint) throws IntentUnsatisfiedException { try { String endpointId = endpoint.getId(); URI address = new URI(endpointId); int timeout = new EndpointPropertiesParser(endpoint).getTimeoutMillis(); InvocationHandler handler = new TcpInvocationHandler(cl, address.getHost(), address.getPort(), endpointId, timeout); - return Proxy.newProxyInstance(cl, interfaces, handler); + Object service = Proxy.newProxyInstance(cl, interfaces, handler); + return () -> service; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java index 5ddd77d4b..4591a2865 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java @@ -38,6 +38,7 @@ import org.apache.aries.rsa.provider.tcp.myservice.PrimitiveService; import org.apache.aries.rsa.provider.tcp.myservice.PrimitiveServiceImpl; import org.apache.aries.rsa.spi.Endpoint; +import org.apache.aries.rsa.spi.ImportedService; import org.apache.aries.rsa.util.EndpointHelper; import org.easymock.EasyMock; import org.junit.AfterClass; @@ -52,6 +53,7 @@ public class TcpProviderPrimitiveTest { private static PrimitiveService myServiceProxy; private static Endpoint ep; + private static ImportedService importedService; @BeforeClass public static void createServerAndProxy() { @@ -66,10 +68,11 @@ public static void createServerAndProxy() { ep = provider.exportService(myService, bc, props, exportedInterfaces); assertThat(ep.description().getId(), startsWith("tcp://localhost:")); System.out.println(ep.description()); - myServiceProxy = (PrimitiveService)provider.importEndpoint(PrimitiveService.class.getClassLoader(), - bc, - exportedInterfaces, - ep.description()); + importedService = provider.importEndpoint(PrimitiveService.class.getClassLoader(), + bc, + exportedInterfaces, + ep.description()); + myServiceProxy = (PrimitiveService)importedService.getService(); } @Test @@ -164,6 +167,7 @@ public void testDTOAr() { @AfterClass public static void close() throws IOException { + importedService.close(); ep.close(); } diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java index 6c39613bc..9f0f376d2 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java @@ -48,6 +48,7 @@ import org.apache.aries.rsa.provider.tcp.myservice.MyService; import org.apache.aries.rsa.provider.tcp.myservice.MyServiceImpl; import org.apache.aries.rsa.spi.Endpoint; +import org.apache.aries.rsa.spi.ImportedService; import org.apache.aries.rsa.util.EndpointHelper; import org.easymock.EasyMock; import org.junit.After; @@ -70,6 +71,8 @@ public class TcpProviderTest { private MyService myServiceProxy2; private Endpoint ep; private Endpoint ep2; + private ImportedService importedService; + private ImportedService importedService2; protected static int getFreePort() throws IOException { try (ServerSocket socket = new ServerSocket()) { @@ -96,21 +99,25 @@ public void createServerAndProxy() throws IOException { props.put("aries.rsa.id", "service2"); ep2 = provider.exportService(new MyServiceImpl("service2"), bc, props, exportedInterfaces); assertThat(ep.description().getId(), startsWith("tcp://localhost:")); - myServiceProxy = (MyService)provider.importEndpoint( - MyService.class.getClassLoader(), - bc, - exportedInterfaces, - ep.description()); - myServiceProxy2 = (MyService)provider.importEndpoint( - MyService.class.getClassLoader(), - bc, - exportedInterfaces, - ep2.description()); + importedService = provider.importEndpoint( + MyService.class.getClassLoader(), + bc, + exportedInterfaces, + ep.description()); + myServiceProxy = (MyService)importedService.getService(); + importedService2 = provider.importEndpoint( + MyService.class.getClassLoader(), + bc, + exportedInterfaces, + ep2.description()); + myServiceProxy2 = (MyService)importedService2.getService(); } @After public void close() throws IOException { + importedService.close(); + importedService2.close(); ep.close(); ep2.close(); } diff --git a/rsa/src/main/java/org/apache/aries/rsa/core/ClientServiceFactory.java b/rsa/src/main/java/org/apache/aries/rsa/core/ClientServiceFactory.java index 4a7334189..869a1dfc7 100644 --- a/rsa/src/main/java/org/apache/aries/rsa/core/ClientServiceFactory.java +++ b/rsa/src/main/java/org/apache/aries/rsa/core/ClientServiceFactory.java @@ -18,12 +18,16 @@ */ package org.apache.aries.rsa.core; +import java.io.IOException; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.aries.rsa.spi.DistributionProvider; +import org.apache.aries.rsa.spi.ImportedService; import org.apache.aries.rsa.spi.IntentUnsatisfiedException; import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; @@ -44,7 +48,7 @@ public class ClientServiceFactory implements ServiceFactory { private ImportRegistrationImpl importRegistration; private boolean closeable; - private int serviceCounter; + private Map services = new HashMap<>(); public ClientServiceFactory(EndpointDescription endpoint, DistributionProvider handler, ImportRegistrationImpl ir) { @@ -63,15 +67,16 @@ public Object getService(final Bundle requestingBundle, final ServiceRegistratio for (String ifaceName : interfaceNames) { interfaces.add(consumerLoader.loadClass(ifaceName)); } - Object proxy = AccessController.doPrivileged(new PrivilegedAction() { - public Object run() { + ImportedService importedService = AccessController.doPrivileged(new PrivilegedAction() { + public ImportedService run() { Class[] ifAr = interfaces.toArray(new Class[]{}); return handler.importEndpoint(consumerLoader, consumerContext, ifAr, endpoint); } }); + Object proxy = importedService.getService(); synchronized (this) { - serviceCounter++; + services.put(proxy, importedService); } return proxy; } catch (IntentUnsatisfiedException iue) { @@ -85,8 +90,15 @@ public Object run() { public void ungetService(Bundle requestingBundle, ServiceRegistration sreg, Object serviceObject) { synchronized (this) { - serviceCounter--; - LOG.debug("Services still provided by this ServiceFactory: {}", serviceCounter); + ImportedService importedService = services.remove(serviceObject); + if (importedService != null) { + try { + importedService.close(); + } catch (IOException e) { + LOG.warn("Problem closing imported service proxy {} for {}", serviceObject, requestingBundle, e); + } + } + LOG.debug("Services still provided by this ServiceFactory: {}", services.size()); closeIfUnused(); } } @@ -99,7 +111,7 @@ public void setCloseable(boolean closeable) { } private synchronized void closeIfUnused() { - if (serviceCounter <= 0 && closeable) { + if (services.isEmpty() && closeable) { importRegistration.closeAll(); } } diff --git a/rsa/src/test/java/org/apache/aries/rsa/core/ClientServiceFactoryTest.java b/rsa/src/test/java/org/apache/aries/rsa/core/ClientServiceFactoryTest.java index 2f45d13f4..51936261c 100644 --- a/rsa/src/test/java/org/apache/aries/rsa/core/ClientServiceFactoryTest.java +++ b/rsa/src/test/java/org/apache/aries/rsa/core/ClientServiceFactoryTest.java @@ -73,7 +73,7 @@ private DistributionProvider mockDistributionProvider(final Object proxy) { EasyMock.expect(handler.importEndpoint(anyObject(ClassLoader.class), anyObject(BundleContext.class), isA(Class[].class), - anyObject(EndpointDescription.class))).andReturn(proxy); + anyObject(EndpointDescription.class))).andReturn(() -> proxy); EasyMock.replay(handler); return handler; } diff --git a/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java b/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java index 22aca0a83..bbccfb27f 100644 --- a/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java +++ b/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java @@ -43,6 +43,7 @@ import org.apache.aries.rsa.core.event.EventProducer; import org.apache.aries.rsa.spi.DistributionProvider; import org.apache.aries.rsa.spi.Endpoint; +import org.apache.aries.rsa.spi.ImportedService; import org.easymock.EasyMock; import org.easymock.IAnswer; import org.easymock.IMocksControl; @@ -429,7 +430,7 @@ public Endpoint exportService(Object serviceO, BundleContext serviceContext, } @Override - public Object importEndpoint(ClassLoader cl, BundleContext consumerContext, Class[] interfaces, + public ImportedService importEndpoint(ClassLoader cl, BundleContext consumerContext, Class[] interfaces, EndpointDescription endpoint) { return null; } diff --git a/spi/src/main/java/org/apache/aries/rsa/spi/DistributionProvider.java b/spi/src/main/java/org/apache/aries/rsa/spi/DistributionProvider.java index 4efd15aee..b23e91480 100644 --- a/spi/src/main/java/org/apache/aries/rsa/spi/DistributionProvider.java +++ b/spi/src/main/java/org/apache/aries/rsa/spi/DistributionProvider.java @@ -30,7 +30,7 @@ public interface DistributionProvider { /** * Called by RemoteServiceAdmin to export a service. - * + *

* The Distribution provider will be called if no config type was set or * if it supports the config type. * @@ -46,14 +46,19 @@ Endpoint exportService(Object serviceO, Class[] exportedInterfaces); /** + * Called by RemoteServiceAdmin to import a service, + * i.e. get a proxy that can be used to access the remote service. + *

* @param cl classloader of the consumer bundle * @param consumerContext bundle context of the consumer bundle * @param interfaces interfaces of the service to proxy * @param endpoint description of the remote endpoint - * @return service proxy to be given to the requesting bundle + * @return an ImportedService that provides the service proxy + * to be given to the requesting bundle, and can be closed + * when the service is no longer used */ - Object importEndpoint(ClassLoader cl, - BundleContext consumerContext, - Class[] interfaces, - EndpointDescription endpoint); + ImportedService importEndpoint(ClassLoader cl, + BundleContext consumerContext, + Class[] interfaces, + EndpointDescription endpoint); } diff --git a/spi/src/main/java/org/apache/aries/rsa/spi/ImportedService.java b/spi/src/main/java/org/apache/aries/rsa/spi/ImportedService.java new file mode 100644 index 000000000..b733a8810 --- /dev/null +++ b/spi/src/main/java/org/apache/aries/rsa/spi/ImportedService.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.spi; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Wraps an imported service proxy, while allowing it to be properly + * closed to release resources, close connections, etc. + */ +public interface ImportedService extends Closeable { + + /** + * Returns the service proxy to be used by consumer bundles. + * + * @return the service proxy + */ + Object getService(); + + /** + * Close the service and release its resources. + *

+ * Implementations should override this to release resources + * associated with the service, close connections, etc. + * when the service is no longer used. + * + * @throws IOException if an error occurs + */ + default void close() throws IOException { + } +} diff --git a/spi/src/main/java/org/apache/aries/rsa/spi/package-info.java b/spi/src/main/java/org/apache/aries/rsa/spi/package-info.java index b622a7972..de124fcd2 100644 --- a/spi/src/main/java/org/apache/aries/rsa/spi/package-info.java +++ b/spi/src/main/java/org/apache/aries/rsa/spi/package-info.java @@ -18,5 +18,5 @@ */ @org.osgi.annotation.bundle.Export -@org.osgi.annotation.versioning.Version("1.1.0") +@org.osgi.annotation.versioning.Version("2.0.0") package org.apache.aries.rsa.spi; \ No newline at end of file