Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.aries.rsa.itests.felix.RsaTestBase;
import org.apache.aries.rsa.spi.DistributionProvider;
import org.apache.aries.rsa.spi.EndpointDescriptionParser;
import org.apache.aries.rsa.spi.ImportedService;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -67,10 +68,11 @@ public static Option[] configure() throws Exception {
@Test
public void testDiscoveryExport() throws Exception {
EndpointDescription epd = getEndpoint();
EchoService service = (EchoService)tcpProvider
.importEndpoint(EchoService.class.getClassLoader(),
bundleContext, new Class[]{EchoService.class}, epd);
ImportedService importedService = tcpProvider.importEndpoint(EchoService.class.getClassLoader(),
bundleContext, new Class[]{EchoService.class}, epd);
EchoService service = (EchoService)importedService.getService();
Assert.assertEquals("test", service.echo("test"));
importedService.close();
}

private EndpointDescription getEndpoint() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.aries.rsa.provider.fastbin.util.UuidGenerator;
import org.apache.aries.rsa.spi.DistributionProvider;
import org.apache.aries.rsa.spi.Endpoint;
import org.apache.aries.rsa.spi.ImportedService;
import org.apache.aries.rsa.spi.IntentUnsatisfiedException;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
Expand Down Expand Up @@ -172,15 +173,16 @@ public void close() throws IOException {
}

@Override
public Object importEndpoint(ClassLoader cl,
BundleContext consumerContext,
Class[] interfaces,
EndpointDescription endpoint)
public ImportedService importEndpoint(ClassLoader cl,
BundleContext consumerContext,
Class[] interfaces,
EndpointDescription endpoint)
throws IntentUnsatisfiedException {

String address = (String) endpoint.getProperties().get(FASTBIN_ADDRESS);
InvocationHandler handler = client.getProxy(address, endpoint.getId(), cl);
return Proxy.newProxyInstance(cl, interfaces, handler);
Object service = Proxy.newProxyInstance(cl, interfaces, handler);
return () -> service;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ public TcpInvocationHandler(ClassLoader cl, String host, int port, String endpoi

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// handle Object methods locally so we can use equals, HashMap, etc. normally
if (method.getDeclaringClass() == Object.class) {
switch (method.getName()) {
case "equals": return proxy == args[0];
case "hashCode": return System.identityHashCode(proxy);
case "toString": return proxy.getClass().getName() + "@"
+ Integer.toHexString(System.identityHashCode(proxy));
}
}
// handle remote invocation
if (Future.class.isAssignableFrom(method.getReturnType()) ||
CompletionStage.class.isAssignableFrom(method.getReturnType())) {
return createFutureResult(method, args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.aries.rsa.annotations.RSADistributionProvider;
import org.apache.aries.rsa.spi.DistributionProvider;
import org.apache.aries.rsa.spi.Endpoint;
import org.apache.aries.rsa.spi.ImportedService;
import org.apache.aries.rsa.spi.IntentUnsatisfiedException;
import org.apache.aries.rsa.util.StringPlus;
import org.osgi.framework.BundleContext;
Expand Down Expand Up @@ -126,17 +127,18 @@ private synchronized void removeServer(TcpEndpoint endpoint) {
}

@Override
public Object importEndpoint(ClassLoader cl,
BundleContext consumerContext,
Class[] interfaces,
EndpointDescription endpoint)
public ImportedService importEndpoint(ClassLoader cl,
BundleContext consumerContext,
Class[] interfaces,
EndpointDescription endpoint)
throws IntentUnsatisfiedException {
try {
String endpointId = endpoint.getId();
URI address = new URI(endpointId);
int timeout = new EndpointPropertiesParser(endpoint).getTimeoutMillis();
InvocationHandler handler = new TcpInvocationHandler(cl, address.getHost(), address.getPort(), endpointId, timeout);
return Proxy.newProxyInstance(cl, interfaces, handler);
Object service = Proxy.newProxyInstance(cl, interfaces, handler);
return () -> service;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.aries.rsa.provider.tcp.myservice.PrimitiveService;
import org.apache.aries.rsa.provider.tcp.myservice.PrimitiveServiceImpl;
import org.apache.aries.rsa.spi.Endpoint;
import org.apache.aries.rsa.spi.ImportedService;
import org.apache.aries.rsa.util.EndpointHelper;
import org.easymock.EasyMock;
import org.junit.AfterClass;
Expand All @@ -52,6 +53,7 @@ public class TcpProviderPrimitiveTest {

private static PrimitiveService myServiceProxy;
private static Endpoint ep;
private static ImportedService importedService;

@BeforeClass
public static void createServerAndProxy() {
Expand All @@ -66,10 +68,11 @@ public static void createServerAndProxy() {
ep = provider.exportService(myService, bc, props, exportedInterfaces);
assertThat(ep.description().getId(), startsWith("tcp://localhost:"));
System.out.println(ep.description());
myServiceProxy = (PrimitiveService)provider.importEndpoint(PrimitiveService.class.getClassLoader(),
bc,
exportedInterfaces,
ep.description());
importedService = provider.importEndpoint(PrimitiveService.class.getClassLoader(),
bc,
exportedInterfaces,
ep.description());
myServiceProxy = (PrimitiveService)importedService.getService();
}

@Test
Expand Down Expand Up @@ -164,6 +167,7 @@ public void testDTOAr() {

@AfterClass
public static void close() throws IOException {
importedService.close();
ep.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.aries.rsa.provider.tcp.myservice.MyService;
import org.apache.aries.rsa.provider.tcp.myservice.MyServiceImpl;
import org.apache.aries.rsa.spi.Endpoint;
import org.apache.aries.rsa.spi.ImportedService;
import org.apache.aries.rsa.util.EndpointHelper;
import org.easymock.EasyMock;
import org.junit.After;
Expand All @@ -70,6 +71,8 @@ public class TcpProviderTest {
private MyService myServiceProxy2;
private Endpoint ep;
private Endpoint ep2;
private ImportedService importedService;
private ImportedService importedService2;

protected static int getFreePort() throws IOException {
try (ServerSocket socket = new ServerSocket()) {
Expand All @@ -96,21 +99,25 @@ public void createServerAndProxy() throws IOException {
props.put("aries.rsa.id", "service2");
ep2 = provider.exportService(new MyServiceImpl("service2"), bc, props, exportedInterfaces);
assertThat(ep.description().getId(), startsWith("tcp://localhost:"));
myServiceProxy = (MyService)provider.importEndpoint(
MyService.class.getClassLoader(),
bc,
exportedInterfaces,
ep.description());
myServiceProxy2 = (MyService)provider.importEndpoint(
MyService.class.getClassLoader(),
bc,
exportedInterfaces,
ep2.description());
importedService = provider.importEndpoint(
MyService.class.getClassLoader(),
bc,
exportedInterfaces,
ep.description());
myServiceProxy = (MyService)importedService.getService();
importedService2 = provider.importEndpoint(
MyService.class.getClassLoader(),
bc,
exportedInterfaces,
ep2.description());
myServiceProxy2 = (MyService)importedService2.getService();
}


@After
public void close() throws IOException {
importedService.close();
importedService2.close();
ep.close();
ep2.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
*/
package org.apache.aries.rsa.core;

import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.aries.rsa.spi.DistributionProvider;
import org.apache.aries.rsa.spi.ImportedService;
import org.apache.aries.rsa.spi.IntentUnsatisfiedException;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
Expand All @@ -44,7 +48,7 @@ public class ClientServiceFactory implements ServiceFactory {
private ImportRegistrationImpl importRegistration;

private boolean closeable;
private int serviceCounter;
private Map<Object, ImportedService> services = new HashMap<>();

public ClientServiceFactory(EndpointDescription endpoint,
DistributionProvider handler, ImportRegistrationImpl ir) {
Expand All @@ -63,15 +67,16 @@ public Object getService(final Bundle requestingBundle, final ServiceRegistratio
for (String ifaceName : interfaceNames) {
interfaces.add(consumerLoader.loadClass(ifaceName));
}
Object proxy = AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
ImportedService importedService = AccessController.doPrivileged(new PrivilegedAction<ImportedService>() {
public ImportedService run() {
Class<?>[] ifAr = interfaces.toArray(new Class[]{});
return handler.importEndpoint(consumerLoader, consumerContext, ifAr, endpoint);
}
});

Object proxy = importedService.getService();
synchronized (this) {
serviceCounter++;
services.put(proxy, importedService);
}
return proxy;
} catch (IntentUnsatisfiedException iue) {
Expand All @@ -85,8 +90,15 @@ public Object run() {

public void ungetService(Bundle requestingBundle, ServiceRegistration sreg, Object serviceObject) {
synchronized (this) {
serviceCounter--;
LOG.debug("Services still provided by this ServiceFactory: {}", serviceCounter);
ImportedService importedService = services.remove(serviceObject);
if (importedService != null) {
try {
importedService.close();
} catch (IOException e) {
LOG.warn("Problem closing imported service proxy {} for {}", serviceObject, requestingBundle, e);
}
}
LOG.debug("Services still provided by this ServiceFactory: {}", services.size());
closeIfUnused();
}
}
Expand All @@ -99,7 +111,7 @@ public void setCloseable(boolean closeable) {
}

private synchronized void closeIfUnused() {
if (serviceCounter <= 0 && closeable) {
if (services.isEmpty() && closeable) {
importRegistration.closeAll();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private DistributionProvider mockDistributionProvider(final Object proxy) {
EasyMock.expect(handler.importEndpoint(anyObject(ClassLoader.class),
anyObject(BundleContext.class),
isA(Class[].class),
anyObject(EndpointDescription.class))).andReturn(proxy);
anyObject(EndpointDescription.class))).andReturn(() -> proxy);
EasyMock.replay(handler);
return handler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.aries.rsa.core.event.EventProducer;
import org.apache.aries.rsa.spi.DistributionProvider;
import org.apache.aries.rsa.spi.Endpoint;
import org.apache.aries.rsa.spi.ImportedService;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IMocksControl;
Expand Down Expand Up @@ -429,7 +430,7 @@ public Endpoint exportService(Object serviceO, BundleContext serviceContext,
}

@Override
public Object importEndpoint(ClassLoader cl, BundleContext consumerContext, Class[] interfaces,
public ImportedService importEndpoint(ClassLoader cl, BundleContext consumerContext, Class[] interfaces,
EndpointDescription endpoint) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface DistributionProvider {

/**
* Called by RemoteServiceAdmin to export a service.
*
* <p>
* The Distribution provider will be called if no config type was set or
* if it supports the config type.
*
Expand All @@ -46,14 +46,19 @@ Endpoint exportService(Object serviceO,
Class[] exportedInterfaces);

/**
* Called by RemoteServiceAdmin to import a service,
* i.e. get a proxy that can be used to access the remote service.
* <p>
* @param cl classloader of the consumer bundle
* @param consumerContext bundle context of the consumer bundle
* @param interfaces interfaces of the service to proxy
* @param endpoint description of the remote endpoint
* @return service proxy to be given to the requesting bundle
* @return an ImportedService that provides the service proxy
* to be given to the requesting bundle, and can be closed
* when the service is no longer used
*/
Object importEndpoint(ClassLoader cl,
BundleContext consumerContext,
Class[] interfaces,
EndpointDescription endpoint);
ImportedService importEndpoint(ClassLoader cl,
BundleContext consumerContext,
Class[] interfaces,
EndpointDescription endpoint);
}
48 changes: 48 additions & 0 deletions spi/src/main/java/org/apache/aries/rsa/spi/ImportedService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.aries.rsa.spi;

import java.io.Closeable;
import java.io.IOException;

/**
* Wraps an imported service proxy, while allowing it to be properly
* closed to release resources, close connections, etc.
*/
public interface ImportedService extends Closeable {

/**
* Returns the service proxy to be used by consumer bundles.
*
* @return the service proxy
*/
Object getService();

/**
* Close the service and release its resources.
* <p>
* Implementations should override this to release resources
* associated with the service, close connections, etc.
* when the service is no longer used.
*
* @throws IOException if an error occurs
*/
default void close() throws IOException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
*/

@org.osgi.annotation.bundle.Export
@org.osgi.annotation.versioning.Version("1.1.0")
@org.osgi.annotation.versioning.Version("2.0.0")
package org.apache.aries.rsa.spi;