Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -62,7 +62,7 @@
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager;
import org.eclipse.hono.client.pubsub.PubSubConfigProperties;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.PubSubPublisherOptions;
import org.eclipse.hono.client.pubsub.PubSubQuarkusOptions;
import org.eclipse.hono.client.pubsub.publisher.CachingPubSubPublisherFactory;
import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherFactory;
import org.eclipse.hono.client.pubsub.subscriber.CachingPubSubSubscriberFactory;
Expand Down Expand Up @@ -166,8 +166,7 @@ public abstract class AbstractProtocolAdapterApplication<C extends ProtocolAdapt

@Inject
void setCommandConsumerClientOptions(
@ConfigMapping(prefix = "hono.command")
final ClientOptions options) {
@ConfigMapping(prefix = "hono.command") final ClientOptions options) {

final var props = new ClientConfigProperties(options);
props.setServerRoleIfUnknown("Command & Control");
Expand All @@ -187,8 +186,8 @@ void setDownstreamSenderOptions(
}

@Inject
void setPubSubClientOptions(final PubSubPublisherOptions options) {
this.pubSubConfigProperties = new PubSubConfigProperties(options);
void setPubSubClientOptions(final PubSubQuarkusOptions pubSubQuarkusOptions) {
this.pubSubConfigProperties = new PubSubConfigProperties(pubSubQuarkusOptions);
}

@Inject
Expand Down Expand Up @@ -308,7 +307,8 @@ protected void doStart() {
})
.onFailure(t -> LOG.error("failed to deploy adapter verticle(s)", t));

final var notificationReceiver = notificationReceiver(kafkaNotificationConfig, downstreamSenderConfig, pubSubConfigProperties);
final var notificationReceiver = notificationReceiver(kafkaNotificationConfig, downstreamSenderConfig,
pubSubConfigProperties);
final Future<String> notificationReceiverTracker = vertx.deployVerticle(
new WrappedLifecycleComponentVerticle(notificationReceiver))
.onSuccess(ok -> {
Expand Down Expand Up @@ -376,11 +376,11 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
}
if (!appConfig.isPubSubMessagingDisabled() && pubSubConfigProperties.isProjectIdConfigured()) {
LOG.info("Pub/Sub client configuration present, adding Pub/Sub messaging clients");
PubSubMessageHelper.getCredentialsProvider()
PubSubMessageHelper.getCredentialsProvider(pubSubConfigProperties)
.ifPresentOrElse(provider -> {
final var pubSubFactory = new CachingPubSubPublisherFactory(
vertx,
pubSubConfigProperties.getProjectId(),
pubSubConfigProperties,
provider);

telemetrySenderProvider
Expand Down Expand Up @@ -432,15 +432,15 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
}

if (!appConfig.isPubSubMessagingDisabled() && pubSubConfigProperties.isProjectIdConfigured()) {
PubSubMessageHelper.getCredentialsProvider()
PubSubMessageHelper.getCredentialsProvider(pubSubConfigProperties)
.ifPresentOrElse(provider -> {
final var pubsubCommandResponseSender = messagingClientProviders
.getCommandResponseSenderProvider()
.getClient(MessagingType.pubsub);
if (pubsubCommandResponseSender != null) {
final var subscriberFactory = new CachingPubSubSubscriberFactory(
vertx,
pubSubConfigProperties.getProjectId(),
pubSubConfigProperties,
provider);
final var pubSubBasedAdminClientManager = new PubSubBasedAdminClientManager(
pubSubConfigProperties,
Expand Down Expand Up @@ -480,8 +480,8 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
}

/**
* Creates a component that the adapter should use for reporting
* devices connecting/disconnecting to/from the adapter.
* Creates a component that the adapter should use for reporting devices connecting/disconnecting to/from the
* adapter.
*
* @return The component or {@code null} if the configured producer type is <em>none</em> or unsupported.
*/
Expand Down
3 changes: 3 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
<opentracing.version>0.33.0</opentracing.version>
<postgresql-image.name>docker.io/library/postgres:14-alpine</postgresql-image.name>
<qpid-jms.version>1.15.0</qpid-jms.version>
<quarkiverse.googlecloudservices.version>2.18.0</quarkiverse.googlecloudservices.version>
<quarkus.platform.version>3.20.4</quarkus.platform.version>
<slf4j.version>2.0.16</slf4j.version>
<spring-security-crypto.version>6.5.2</spring-security-crypto.version>
Expand Down Expand Up @@ -195,6 +196,8 @@ quarkus.log.category."io.quarkus.vertx.core.runtime".level=DEBUG
quarkus.log.category."org.eclipse.hono".level=${hono.debug.level:INFO}
# Max exec time
quarkus.vertx.max-event-loop-execute-time=${max.event-loop.execute-time:20000}
quarkus.google.cloud.pubsub.emulator-host=${google.cloud.emulator-host:localhost:8085}
quarkus.google.cloud.project-id=${google.cloud.project-id:local-test-project}
</quarkus.application-dev.properties>
</properties>

Expand Down
1 change: 1 addition & 0 deletions clients/pubsub-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<dependency>
<groupId>io.quarkiverse.googlecloudservices</groupId>
<artifactId>quarkus-google-cloud-pubsub</artifactId>
<version>${quarkiverse.googlecloudservices.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
Expand Down
49 changes: 49 additions & 0 deletions clients/pubsub-common/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Local development using PubSub

PubSub commons supports using the PubSub emulator for local development. To enable this, some things need to be taken
care of:

- starting the emulator
- creating the topic structure
- creating a tenant that has the pubsub messaging type
- making sure no other communication disturbs the tests

## Starting the Emulator
To start the emulator, run

```
gcloud beta emulators pubsub start --project=local-test-project
```

## Creating the topic structure
After starting the emulator, some topics have to be created. This can be done by running PubSubEmulatorSetup.

## Creating a tenant that uses Pub/Sub messaging
When creating or updating the tenant, make sure to activate PubSub communication.
```json
{
"enabled": true,
"ext": {
"messaging-type": "pubsub"
}
}
```

## Disable concurrent messaging
When using the emulator while running quarkus:dev, it is recommended to disable kafka and ampq messaging. This ensures
that pubsub is actually used.

```shell
mvn quarkus:dev
-Dhono.kafka-messaging.disabled=true
-Dhono.kafka-messaging.disabled=true
```

## Connecting an example client to MQTT
make sure the client subscribes to`c///q/#` to receive commands.


## Additional thoughts

- Make sure when running quarkus:dev to manage the quarkus.http.port for the services and avoid collisions
- Make sure that the corresponding databases/caches are running, e.g. a device registry postgres or an infinispan
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,14 @@ protected final Future<Void> sendAndWaitForOutcome(
.ifPresent(builder::setData);

final PubsubMessage pubsubMessage = builder.build();

log.debug("sending message to Pub/Sub [topic: {}, registry: {}, deviceId: {}]", topic, tenantId, deviceId);
final String sendingMessageLog = String.format(
"Sending message to Pub/Sub [topic: %s, registry: %s, deviceId: %s]", topic, tenantId, deviceId);
log.debug(sendingMessageLog);
logPubSubMessage(currentSpan, pubsubMessage, topic, tenantId);

return getOrCreatePublisher(topic).publish(pubsubMessage)
final PubSubPublisherClient publisher = getOrCreatePublisher(topic);
currentSpan.log("Got PubSub Publisher. " + sendingMessageLog);
return publisher.publish(pubsubMessage)
.onSuccess(recordMessage -> logPubSubMessageId(currentSpan, topic, recordMessage))
.recover(t -> retrySendToFallbackTopic(topic, currentSpan, tenantId, deviceId, t, pubsubMessage))
.mapEmpty();
Expand All @@ -220,7 +223,9 @@ private Future<String> retrySendToFallbackTopic(final String topic,
publisherFactory.closePublisher(topic);

final String fallbackTopic = PubSubMessageHelper.getTopicName(fallback, tenantId);
log.debug("Retry sending message to Pub/Sub using the fallback topic [{}]", fallbackTopic);
final String logMessage = String.format("Retry sending message to Pub/Sub using the fallback topic [%s]", fallbackTopic);
log.debug(logMessage);
currentSpan.log(logMessage);
// retry publish on fallback topic
return getOrCreatePublisher(fallbackTopic).publish(pubsubMessage)
.onSuccess(recordMessage -> logPubSubMessageId(currentSpan, fallbackTopic, recordMessage))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.LoggerFactory;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
Expand Down Expand Up @@ -53,6 +54,7 @@ public class PubSubBasedAdminClientManager {
private final Vertx vertx;
private SubscriptionAdminClient subscriptionAdminClient;
private TopicAdminClient topicAdminClient;
private final PubSubConfigProperties pubSubConfigProperties;

/**
* Creates a new PubSubBasedAdminClientManager.
Expand All @@ -66,6 +68,11 @@ public PubSubBasedAdminClientManager(final PubSubConfigProperties pubSubConfigPr
final CredentialsProvider credentialsProvider, final Vertx vertx) {
Objects.requireNonNull(pubSubConfigProperties);
this.projectId = Objects.requireNonNull(pubSubConfigProperties.getProjectId());
this.pubSubConfigProperties = pubSubConfigProperties;

if (pubSubConfigProperties.isEmulatorHostConfigured()) {
LOG.debug("Using pubsub emulator.");
}
this.credentialsProvider = Objects.requireNonNull(credentialsProvider);
this.vertx = Objects.requireNonNull(vertx);
}
Expand All @@ -75,15 +82,12 @@ private Future<TopicAdminClient> getOrCreateTopicAdminClient() {
return Future.succeededFuture(topicAdminClient);
}
try {
final TopicAdminSettings adminSettings = TopicAdminSettings
.newBuilder()
.setCredentialsProvider(credentialsProvider)
.build();
final TopicAdminSettings adminSettings = getTopicAdminSettings();
topicAdminClient = TopicAdminClient.create(adminSettings);
return Future.succeededFuture(topicAdminClient);
} catch (IOException e) {
LOG.debug("Error initializing topic admin client: {}", e.getMessage());
return Future.failedFuture("Error creating client");
return Future.failedFuture("Error creating topic admin client");
}
}

Expand All @@ -92,16 +96,43 @@ private Future<SubscriptionAdminClient> getOrCreateSubscriptionAdminClient() {
return Future.succeededFuture(subscriptionAdminClient);
}
try {
final SubscriptionAdminSettings adminSettings = SubscriptionAdminSettings
.newBuilder()
.setCredentialsProvider(credentialsProvider)
.build();
final SubscriptionAdminSettings adminSettings = getSubscriptionAdminSettings();
subscriptionAdminClient = SubscriptionAdminClient.create(adminSettings);
return Future.succeededFuture(subscriptionAdminClient);
} catch (IOException e) {
LOG.debug("Error initializing subscription admin client: {}", e.getMessage());
return Future.failedFuture("Error creating client");
return Future.failedFuture("Error creating subscription admin client");
}
}

private TopicAdminSettings getTopicAdminSettings() throws IOException {
final TopicAdminSettings.Builder builder = TopicAdminSettings.newBuilder();

if (pubSubConfigProperties.isEmulatorHostConfigured()) {
final var channelProvider = PubSubMessageHelper.getTransportChannelProvider(pubSubConfigProperties);
builder
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create());
} else {
builder.setCredentialsProvider(credentialsProvider);
}

return builder.build();
}

private SubscriptionAdminSettings getSubscriptionAdminSettings() throws IOException {
final SubscriptionAdminSettings.Builder builder = SubscriptionAdminSettings.newBuilder();

if (pubSubConfigProperties.isEmulatorHostConfigured()) {
final var channelProvider = PubSubMessageHelper.getTransportChannelProvider(pubSubConfigProperties);
builder
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create());
} else {
builder.setCredentialsProvider(credentialsProvider);
}

return builder.build();
}

/**
Expand Down Expand Up @@ -137,8 +168,8 @@ private Future<String> createTopic(final TopicName topicName, final TopicAdminCl
return vertx.executeBlocking(() -> {
return client.createTopic(topicName).getName();
})
.onSuccess(top -> LOG.debug("Topic {} created successfully.", topicName))
.onFailure(thr -> LOG.debug("Creating topic failed [topic: {}, projectId: {}]", topicName, projectId));
.onSuccess(top -> LOG.debug("Topic {} created successfully.", topicName))
.onFailure(thr -> LOG.debug("Creating topic failed [topic: {}, projectId: {}]", topicName, projectId));
}

/**
Expand Down Expand Up @@ -168,17 +199,17 @@ public Future<String> getOrCreateSubscription(final String endpoint, final Strin
}

private Future<String> getSubscription(
final SubscriptionName subscriptionName,
final SubscriptionAdminClient client) {
final SubscriptionName subscriptionName,
final SubscriptionAdminClient client) {
return vertx.executeBlocking(() -> {
return client.getSubscription(subscriptionName).getName();
});
}

private Future<String> createSubscription(
final SubscriptionName subscriptionName,
final TopicName topicName,
final SubscriptionAdminClient client) {
final SubscriptionName subscriptionName,
final TopicName topicName,
final SubscriptionAdminClient client) {
final Subscription request = Subscription.newBuilder()
.setName(subscriptionName.toString())
.setTopic(topicName.toString())
Expand All @@ -190,15 +221,15 @@ private Future<String> createSubscription(
return vertx.executeBlocking(() -> {
return client.createSubscription(request).getName();
})
.onSuccess(sub -> LOG.debug("Subscription {} created successfully.", subscriptionName))
.onFailure(thr -> LOG.debug(
"Creating subscription failed [subscription: {}, topic: {}, project: {}]",
subscriptionName, topicName, projectId));
.onSuccess(sub -> LOG.debug("Subscription {} created successfully.", subscriptionName))
.onFailure(thr -> LOG.debug(
"Creating subscription failed [subscription: {}, topic: {}, project: {}]",
subscriptionName, topicName, projectId));
}

/**
* Closes the TopicAdminClient and the SubscriptionAdminClient if they exist. This method is expected to be invoked
* as soon as the TopicAdminClient and the SubscriptionAdminClient is no longer needed. This method will block the
* as soon as the TopicAdminClient and the SubscriptionAdminClient are no longer needed. This method will block the
* current thread for up to 10 seconds!
*/
public void closeAdminClients() {
Expand All @@ -213,26 +244,28 @@ public void closeAdminClients() {
}

private void closeSubscriptionAdminClient() {
if (subscriptionAdminClient != null) {
subscriptionAdminClient.shutdown();
try {
subscriptionAdminClient.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.debug("Resources are not freed properly, error", e);
Thread.currentThread().interrupt();
}
if (subscriptionAdminClient == null) {
return;
}
subscriptionAdminClient.shutdown();
try {
subscriptionAdminClient.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.debug("Resources are not freed properly, error", e);
Thread.currentThread().interrupt();
}
}

private void closeTopicAdminClient() {
if (topicAdminClient != null) {
topicAdminClient.shutdown();
try {
topicAdminClient.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.debug("Resources are not freed properly, error", e);
Thread.currentThread().interrupt();
}
if (topicAdminClient == null) {
return;
}
topicAdminClient.shutdown();
try {
topicAdminClient.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.debug("Resources are not freed properly, error", e);
Thread.currentThread().interrupt();
}
}
}
Loading