diff --git a/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java b/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java index aeafa8ffd0..b6edc86183 100644 --- a/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java +++ b/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2020 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -62,7 +62,7 @@ import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager; import org.eclipse.hono.client.pubsub.PubSubConfigProperties; import org.eclipse.hono.client.pubsub.PubSubMessageHelper; -import org.eclipse.hono.client.pubsub.PubSubPublisherOptions; +import org.eclipse.hono.client.pubsub.PubSubQuarkusOptions; import org.eclipse.hono.client.pubsub.publisher.CachingPubSubPublisherFactory; import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherFactory; import org.eclipse.hono.client.pubsub.subscriber.CachingPubSubSubscriberFactory; @@ -166,8 +166,7 @@ public abstract class AbstractProtocolAdapterApplication LOG.error("failed to deploy adapter verticle(s)", t)); - final var notificationReceiver = notificationReceiver(kafkaNotificationConfig, downstreamSenderConfig, pubSubConfigProperties); + final var notificationReceiver = notificationReceiver(kafkaNotificationConfig, downstreamSenderConfig, + pubSubConfigProperties); final Future notificationReceiverTracker = vertx.deployVerticle( new WrappedLifecycleComponentVerticle(notificationReceiver)) .onSuccess(ok -> { @@ -376,11 +376,11 @@ protected void setCollaborators(final AbstractProtocolAdapterBase adapter) { } if (!appConfig.isPubSubMessagingDisabled() && pubSubConfigProperties.isProjectIdConfigured()) { LOG.info("Pub/Sub client configuration present, adding Pub/Sub messaging clients"); - PubSubMessageHelper.getCredentialsProvider() + PubSubMessageHelper.getCredentialsProvider(pubSubConfigProperties) .ifPresentOrElse(provider -> { final var pubSubFactory = new CachingPubSubPublisherFactory( vertx, - pubSubConfigProperties.getProjectId(), + pubSubConfigProperties, provider); telemetrySenderProvider @@ -432,7 +432,7 @@ protected void setCollaborators(final AbstractProtocolAdapterBase adapter) { } if (!appConfig.isPubSubMessagingDisabled() && pubSubConfigProperties.isProjectIdConfigured()) { - PubSubMessageHelper.getCredentialsProvider() + PubSubMessageHelper.getCredentialsProvider(pubSubConfigProperties) .ifPresentOrElse(provider -> { final var pubsubCommandResponseSender = messagingClientProviders .getCommandResponseSenderProvider() @@ -440,7 +440,7 @@ protected void setCollaborators(final AbstractProtocolAdapterBase adapter) { if (pubsubCommandResponseSender != null) { final var subscriberFactory = new CachingPubSubSubscriberFactory( vertx, - pubSubConfigProperties.getProjectId(), + pubSubConfigProperties, provider); final var pubSubBasedAdminClientManager = new PubSubBasedAdminClientManager( pubSubConfigProperties, @@ -480,8 +480,8 @@ protected void setCollaborators(final AbstractProtocolAdapterBase adapter) { } /** - * Creates a component that the adapter should use for reporting - * devices connecting/disconnecting to/from the adapter. + * Creates a component that the adapter should use for reporting devices connecting/disconnecting to/from the + * adapter. * * @return The component or {@code null} if the configured producer type is none or unsupported. */ diff --git a/bom/pom.xml b/bom/pom.xml index 61957b25a0..57302ed5d3 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -48,6 +48,7 @@ 0.33.0 docker.io/library/postgres:14-alpine 1.15.0 + 2.18.0 3.20.4 2.0.16 6.5.2 @@ -195,6 +196,8 @@ quarkus.log.category."io.quarkus.vertx.core.runtime".level=DEBUG quarkus.log.category."org.eclipse.hono".level=${hono.debug.level:INFO} # Max exec time quarkus.vertx.max-event-loop-execute-time=${max.event-loop.execute-time:20000} +quarkus.google.cloud.pubsub.emulator-host=${google.cloud.emulator-host:localhost:8085} +quarkus.google.cloud.project-id=${google.cloud.project-id:local-test-project} diff --git a/clients/pubsub-common/pom.xml b/clients/pubsub-common/pom.xml index bf39a35cc0..9d05f47c67 100644 --- a/clients/pubsub-common/pom.xml +++ b/clients/pubsub-common/pom.xml @@ -36,6 +36,7 @@ io.quarkiverse.googlecloudservices quarkus-google-cloud-pubsub + ${quarkiverse.googlecloudservices.version} commons-logging diff --git a/clients/pubsub-common/readme.md b/clients/pubsub-common/readme.md new file mode 100644 index 0000000000..cccc4990ff --- /dev/null +++ b/clients/pubsub-common/readme.md @@ -0,0 +1,49 @@ +# Local development using PubSub + +PubSub commons supports using the PubSub emulator for local development. To enable this, some things need to be taken +care of: + +- starting the emulator +- creating the topic structure +- creating a tenant that has the pubsub messaging type +- making sure no other communication disturbs the tests + +## Starting the Emulator +To start the emulator, run + +``` +gcloud beta emulators pubsub start --project=local-test-project +``` + +## Creating the topic structure +After starting the emulator, some topics have to be created. This can be done by running PubSubEmulatorSetup. + +## Creating a tenant that uses Pub/Sub messaging +When creating or updating the tenant, make sure to activate PubSub communication. +```json +{ + "enabled": true, + "ext": { + "messaging-type": "pubsub" + } +} +``` + +## Disable concurrent messaging +When using the emulator while running quarkus:dev, it is recommended to disable kafka and ampq messaging. This ensures +that pubsub is actually used. + +```shell +mvn quarkus:dev +-Dhono.kafka-messaging.disabled=true +-Dhono.kafka-messaging.disabled=true +``` + +## Connecting an example client to MQTT +make sure the client subscribes to`c///q/#` to receive commands. + + +## Additional thoughts + +- Make sure when running quarkus:dev to manage the quarkus.http.port for the services and avoid collisions +- Make sure that the corresponding databases/caches are running, e.g. a device registry postgres or an infinispan diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/AbstractPubSubBasedMessageSender.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/AbstractPubSubBasedMessageSender.java index 5a842389a2..fad106bd7b 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/AbstractPubSubBasedMessageSender.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/AbstractPubSubBasedMessageSender.java @@ -193,11 +193,14 @@ protected final Future sendAndWaitForOutcome( .ifPresent(builder::setData); final PubsubMessage pubsubMessage = builder.build(); - - log.debug("sending message to Pub/Sub [topic: {}, registry: {}, deviceId: {}]", topic, tenantId, deviceId); + final String sendingMessageLog = String.format( + "Sending message to Pub/Sub [topic: %s, registry: %s, deviceId: %s]", topic, tenantId, deviceId); + log.debug(sendingMessageLog); logPubSubMessage(currentSpan, pubsubMessage, topic, tenantId); - return getOrCreatePublisher(topic).publish(pubsubMessage) + final PubSubPublisherClient publisher = getOrCreatePublisher(topic); + currentSpan.log("Got PubSub Publisher. " + sendingMessageLog); + return publisher.publish(pubsubMessage) .onSuccess(recordMessage -> logPubSubMessageId(currentSpan, topic, recordMessage)) .recover(t -> retrySendToFallbackTopic(topic, currentSpan, tenantId, deviceId, t, pubsubMessage)) .mapEmpty(); @@ -220,7 +223,9 @@ private Future retrySendToFallbackTopic(final String topic, publisherFactory.closePublisher(topic); final String fallbackTopic = PubSubMessageHelper.getTopicName(fallback, tenantId); - log.debug("Retry sending message to Pub/Sub using the fallback topic [{}]", fallbackTopic); + final String logMessage = String.format("Retry sending message to Pub/Sub using the fallback topic [%s]", fallbackTopic); + log.debug(logMessage); + currentSpan.log(logMessage); // retry publish on fallback topic return getOrCreatePublisher(fallbackTopic).publish(pubsubMessage) .onSuccess(recordMessage -> logPubSubMessageId(currentSpan, fallbackTopic, recordMessage)) diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java index e9b3e2cf09..10b8d5dc7d 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java @@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory; import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.rpc.NotFoundException; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; @@ -53,6 +54,7 @@ public class PubSubBasedAdminClientManager { private final Vertx vertx; private SubscriptionAdminClient subscriptionAdminClient; private TopicAdminClient topicAdminClient; + private final PubSubConfigProperties pubSubConfigProperties; /** * Creates a new PubSubBasedAdminClientManager. @@ -66,6 +68,11 @@ public PubSubBasedAdminClientManager(final PubSubConfigProperties pubSubConfigPr final CredentialsProvider credentialsProvider, final Vertx vertx) { Objects.requireNonNull(pubSubConfigProperties); this.projectId = Objects.requireNonNull(pubSubConfigProperties.getProjectId()); + this.pubSubConfigProperties = pubSubConfigProperties; + + if (pubSubConfigProperties.isEmulatorHostConfigured()) { + LOG.debug("Using pubsub emulator."); + } this.credentialsProvider = Objects.requireNonNull(credentialsProvider); this.vertx = Objects.requireNonNull(vertx); } @@ -75,15 +82,12 @@ private Future getOrCreateTopicAdminClient() { return Future.succeededFuture(topicAdminClient); } try { - final TopicAdminSettings adminSettings = TopicAdminSettings - .newBuilder() - .setCredentialsProvider(credentialsProvider) - .build(); + final TopicAdminSettings adminSettings = getTopicAdminSettings(); topicAdminClient = TopicAdminClient.create(adminSettings); return Future.succeededFuture(topicAdminClient); } catch (IOException e) { LOG.debug("Error initializing topic admin client: {}", e.getMessage()); - return Future.failedFuture("Error creating client"); + return Future.failedFuture("Error creating topic admin client"); } } @@ -92,16 +96,43 @@ private Future getOrCreateSubscriptionAdminClient() { return Future.succeededFuture(subscriptionAdminClient); } try { - final SubscriptionAdminSettings adminSettings = SubscriptionAdminSettings - .newBuilder() - .setCredentialsProvider(credentialsProvider) - .build(); + final SubscriptionAdminSettings adminSettings = getSubscriptionAdminSettings(); subscriptionAdminClient = SubscriptionAdminClient.create(adminSettings); return Future.succeededFuture(subscriptionAdminClient); } catch (IOException e) { LOG.debug("Error initializing subscription admin client: {}", e.getMessage()); - return Future.failedFuture("Error creating client"); + return Future.failedFuture("Error creating subscription admin client"); + } + } + + private TopicAdminSettings getTopicAdminSettings() throws IOException { + final TopicAdminSettings.Builder builder = TopicAdminSettings.newBuilder(); + + if (pubSubConfigProperties.isEmulatorHostConfigured()) { + final var channelProvider = PubSubMessageHelper.getTransportChannelProvider(pubSubConfigProperties); + builder + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()); + } else { + builder.setCredentialsProvider(credentialsProvider); + } + + return builder.build(); + } + + private SubscriptionAdminSettings getSubscriptionAdminSettings() throws IOException { + final SubscriptionAdminSettings.Builder builder = SubscriptionAdminSettings.newBuilder(); + + if (pubSubConfigProperties.isEmulatorHostConfigured()) { + final var channelProvider = PubSubMessageHelper.getTransportChannelProvider(pubSubConfigProperties); + builder + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()); + } else { + builder.setCredentialsProvider(credentialsProvider); } + + return builder.build(); } /** @@ -137,8 +168,8 @@ private Future createTopic(final TopicName topicName, final TopicAdminCl return vertx.executeBlocking(() -> { return client.createTopic(topicName).getName(); }) - .onSuccess(top -> LOG.debug("Topic {} created successfully.", topicName)) - .onFailure(thr -> LOG.debug("Creating topic failed [topic: {}, projectId: {}]", topicName, projectId)); + .onSuccess(top -> LOG.debug("Topic {} created successfully.", topicName)) + .onFailure(thr -> LOG.debug("Creating topic failed [topic: {}, projectId: {}]", topicName, projectId)); } /** @@ -168,17 +199,17 @@ public Future getOrCreateSubscription(final String endpoint, final Strin } private Future getSubscription( - final SubscriptionName subscriptionName, - final SubscriptionAdminClient client) { + final SubscriptionName subscriptionName, + final SubscriptionAdminClient client) { return vertx.executeBlocking(() -> { return client.getSubscription(subscriptionName).getName(); }); } private Future createSubscription( - final SubscriptionName subscriptionName, - final TopicName topicName, - final SubscriptionAdminClient client) { + final SubscriptionName subscriptionName, + final TopicName topicName, + final SubscriptionAdminClient client) { final Subscription request = Subscription.newBuilder() .setName(subscriptionName.toString()) .setTopic(topicName.toString()) @@ -190,15 +221,15 @@ private Future createSubscription( return vertx.executeBlocking(() -> { return client.createSubscription(request).getName(); }) - .onSuccess(sub -> LOG.debug("Subscription {} created successfully.", subscriptionName)) - .onFailure(thr -> LOG.debug( - "Creating subscription failed [subscription: {}, topic: {}, project: {}]", - subscriptionName, topicName, projectId)); + .onSuccess(sub -> LOG.debug("Subscription {} created successfully.", subscriptionName)) + .onFailure(thr -> LOG.debug( + "Creating subscription failed [subscription: {}, topic: {}, project: {}]", + subscriptionName, topicName, projectId)); } /** * Closes the TopicAdminClient and the SubscriptionAdminClient if they exist. This method is expected to be invoked - * as soon as the TopicAdminClient and the SubscriptionAdminClient is no longer needed. This method will block the + * as soon as the TopicAdminClient and the SubscriptionAdminClient are no longer needed. This method will block the * current thread for up to 10 seconds! */ public void closeAdminClients() { @@ -213,26 +244,28 @@ public void closeAdminClients() { } private void closeSubscriptionAdminClient() { - if (subscriptionAdminClient != null) { - subscriptionAdminClient.shutdown(); - try { - subscriptionAdminClient.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOG.debug("Resources are not freed properly, error", e); - Thread.currentThread().interrupt(); - } + if (subscriptionAdminClient == null) { + return; + } + subscriptionAdminClient.shutdown(); + try { + subscriptionAdminClient.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.debug("Resources are not freed properly, error", e); + Thread.currentThread().interrupt(); } } private void closeTopicAdminClient() { - if (topicAdminClient != null) { - topicAdminClient.shutdown(); - try { - topicAdminClient.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOG.debug("Resources are not freed properly, error", e); - Thread.currentThread().interrupt(); - } + if (topicAdminClient == null) { + return; + } + topicAdminClient.shutdown(); + try { + topicAdminClient.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.debug("Resources are not freed properly, error", e); + Thread.currentThread().interrupt(); } } } diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubConfigProperties.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubConfigProperties.java index 39e807b3a9..81795c9527 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubConfigProperties.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubConfigProperties.java @@ -13,23 +13,26 @@ package org.eclipse.hono.client.pubsub; +import io.netty.util.internal.StringUtil; import jakarta.inject.Singleton; /** - * Common configuration properties required for access to Pub/Sub. + * Common configuration properties required for using Pub/Sub. */ @Singleton public final class PubSubConfigProperties { private String projectId = null; + private String emulatorHost = null; /** - * Creates properties based on existing options. + * Creates properties based on existing pubSubPublisherOptions. * - * @param options The options to copy. + * @param pubSubQuarkusOptions The pubSubQuarkusOptions containing the Google Cloud Project ID. */ - public PubSubConfigProperties(final PubSubPublisherOptions options) { - setProjectId(options.projectId().orElse(null)); + public PubSubConfigProperties(final PubSubQuarkusOptions pubSubQuarkusOptions) { + setProjectId(pubSubQuarkusOptions.projectId().orElse(null)); + setEmulatorHost(pubSubQuarkusOptions.pubsub().emulatorHost().orElse(null)); } /** @@ -56,6 +59,33 @@ public void setProjectId(final String projectId) { * @return {@code true} if the projectId property has been set via {@link #setProjectId(String)}. */ public boolean isProjectIdConfigured() { - return projectId != null; + return projectId != null && StringUtil.length(projectId) > 0; + } + + /** + * Gets the emulator host that the Pub/Sub client is configured to connect to. + * + * @return The emulatorHost or {@code null} if no emulatorHost has been set. + */ + public String getEmulatorHost() { + return emulatorHost; + } + + /** + * Sets the emulator host that the Pub/Sub client should connect to. + * + * @param emulatorHost The emulator host if configured. + */ + public void setEmulatorHost(final String emulatorHost) { + this.emulatorHost = emulatorHost; + } + + /** + * Checks if the emulatorHost property has been explicitly set. + * + * @return {@code true} if the emulatorHost property has been set via {@link #setEmulatorHost(String)}. + */ + public boolean isEmulatorHostConfigured() { + return emulatorHost != null && StringUtil.length(emulatorHost) > 0; } } diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java index b6d73bb244..c7a1f6af54 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java @@ -26,10 +26,17 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + /** * Utility methods for working with Pub/Sub. */ @@ -44,7 +51,8 @@ public final class PubSubMessageHelper { */ public static final String PUBSUB_PROPERTY_RESPONSE_REQUIRED = "response-required"; /** - * The name of the Pub/Sub message property indicating whether an acknowledgement to the message is expected/required. + * The name of the Pub/Sub message property indicating whether an acknowledgement to the message is + * expected/required. */ public static final String PUBSUB_PROPERTY_ACK_REQUIRED = "ack-required"; @@ -60,12 +68,16 @@ private PubSubMessageHelper() { /** * Gets the provider for credentials to use for authenticating to the Pub/Sub service. * + * @param pubSubConfigProperties If emulator host is configured, take NoCredentialsProvider, otherwise take FixedCredentialsProvider * @return An optional containing a CredentialsProvider to use for authenticating to the Pub/Sub service or an empty * optional if the given GoogleCredentials is {@code null}. */ - public static Optional getCredentialsProvider() { - return Optional.ofNullable(getCredentials()) - .map(FixedCredentialsProvider::create); + public static Optional getCredentialsProvider( + final PubSubConfigProperties pubSubConfigProperties) { + if (pubSubConfigProperties.isEmulatorHostConfigured()) { + return Optional.of(NoCredentialsProvider.create()); + } + return Optional.ofNullable(getCredentials()).map(FixedCredentialsProvider::create); } private static GoogleCredentials getCredentials() { @@ -77,6 +89,20 @@ private static GoogleCredentials getCredentials() { } } + /** + * creates the transport channel provider for the given emulator host. + * + * @param pubSubConfigProperties contains the emulator host, e.g. localhost:8085 + * @return the transport channel provider. + */ + public static TransportChannelProvider getTransportChannelProvider(final PubSubConfigProperties pubSubConfigProperties) { + final ManagedChannel channel = ManagedChannelBuilder + .forTarget(pubSubConfigProperties.getEmulatorHost()) + .usePlaintext() + .build(); + return FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); + } + /** * Gets the topic name with the given prefix. * @@ -109,7 +135,8 @@ public static String getTopicName(final String topic, final String prefix, final * Gets the subtopics from the orig_address attribute of the message. * * @param origAddress The orig_address attribute. - * @return An immutable list containing all the subtopics in hierarchical order or an empty immutable list if the topic has no subtopics. + * @return An immutable list containing all the subtopics in hierarchical order or an empty immutable list if the + * topic has no subtopics. */ public static List getSubtopics(final String origAddress) { final String trimmedOrigAddress = origAddress.startsWith("/") ? origAddress.substring(1) : origAddress; diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubPublisherOptions.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubPublisherOptions.java index f07edbb197..bf2828a15e 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubPublisherOptions.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubPublisherOptions.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2022 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubQuarkusOptions.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubQuarkusOptions.java new file mode 100644 index 0000000000..e9ea90af15 --- /dev/null +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubQuarkusOptions.java @@ -0,0 +1,55 @@ +/** + * Copyright (c) 2022 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.hono.client.pubsub; + +import java.util.Optional; + +import io.smallrye.config.ConfigMapping; + +/** + * Common options for configuring a Quarkus Google Cloud client. + *

+ * We are using the prefix and naming strategy to match the property name(s) defined by the + * Quarkus + * Google Cloud Services extension + */ +@ConfigMapping(prefix = "quarkus.google.cloud", namingStrategy = ConfigMapping.NamingStrategy.KEBAB_CASE) +public interface PubSubQuarkusOptions { + + /** + * Gets the Google Cloud Project identifier. + * + * @return The identifier. + */ + Optional projectId(); + + /** + * Gets the Pub Sub configuration. + * + * @return The Pub Sub configuration. + */ + PubSubConfig pubsub(); + + /** + * The Pub Sub configuration. + */ + @ConfigMapping(prefix = "pubsub") + interface PubSubConfig { + /** + * Gets the Pub Sub Emulator Host. + * + * @return The host. + */ + Optional emulatorHost(); + } +} diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/CachingPubSubPublisherFactory.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/CachingPubSubPublisherFactory.java index 86c27b58ca..59b7b3fb7f 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/CachingPubSubPublisherFactory.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/CachingPubSubPublisherFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2022 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -20,6 +20,7 @@ import java.util.function.Supplier; import org.eclipse.hono.client.ServerErrorException; +import org.eclipse.hono.client.pubsub.PubSubConfigProperties; import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import com.google.api.gax.core.CredentialsProvider; @@ -34,7 +35,7 @@ public final class CachingPubSubPublisherFactory implements PubSubPublisherFacto private final Vertx vertx; private final Map activePublishers = new ConcurrentHashMap<>(); - private final String projectId; + private final PubSubConfigProperties pubSubConfigProperties; private final CredentialsProvider credentialsProvider; private Supplier clientSupplier; @@ -42,17 +43,18 @@ public final class CachingPubSubPublisherFactory implements PubSubPublisherFacto * Creates a new factory for {@link PubSubPublisherClient} instances. * * @param vertx The Vert.x instance that this factory runs on. - * @param projectId The identifier of the Google Cloud Project to connect to. + * @param pubSubConfigProperties The Pub/Sub configuration properties. * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. * @throws NullPointerException if any of the parameter is {@code null}. */ public CachingPubSubPublisherFactory( final Vertx vertx, - final String projectId, + final PubSubConfigProperties pubSubConfigProperties, final CredentialsProvider credentialsProvider) { this.vertx = Objects.requireNonNull(vertx); - this.projectId = Objects.requireNonNull(projectId); + this.pubSubConfigProperties = Objects.requireNonNull(pubSubConfigProperties); this.credentialsProvider = Objects.requireNonNull(credentialsProvider); + Objects.requireNonNull(pubSubConfigProperties.getProjectId()); } /** @@ -102,7 +104,7 @@ public Optional getPublisher(final String topic, final St private PubSubPublisherClient getPubSubPublisherClient(final String topic) { return Optional.ofNullable(clientSupplier) .map(Supplier::get) - .orElseGet(() -> new PubSubPublisherClientImpl(vertx, projectId, topic, credentialsProvider)); + .orElseGet(() -> new PubSubPublisherClientImpl(vertx, pubSubConfigProperties, topic, credentialsProvider)); } private Future removePublisher(final String topicName) { diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/PubSubPublisherClientImpl.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/PubSubPublisherClientImpl.java index a7fd7b87ef..40a8846dda 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/PubSubPublisherClientImpl.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/PubSubPublisherClientImpl.java @@ -21,6 +21,8 @@ import org.eclipse.hono.client.ClientErrorException; import org.eclipse.hono.client.ServerErrorException; import org.eclipse.hono.client.ServiceInvocationException; +import org.eclipse.hono.client.pubsub.PubSubConfigProperties; +import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +30,7 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; import com.google.cloud.pubsub.v1.Publisher; import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.PubsubMessage; @@ -39,7 +42,7 @@ import io.vertx.core.Vertx; /** - * A client for publishing messages to Pub/Sub + * A client for publishing messages to Pub/Sub. *

* Wraps a Pub/Sub publisher. *

@@ -55,28 +58,37 @@ final class PubSubPublisherClientImpl implements PubSubPublisherClient { * based on a created TopicName, which follows the format: projects/projectId/topics/topic. * * @param vertx The Vert.x instance that this publisher runs on. - * @param projectId The Google project id to use. + * @param pubSubConfigProperties The Pub/Sub configuration properties. * @param topic The topic to create the publisher for. - * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service - * or {@code null} if the default provider should be used. + * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. + * If the emulator is used, provider will be overwritten. + * If {@code null} the default provider should be used. * @throws ClientErrorException if the initialization of the Publisher failed. * @throws NullPointerException if any of project ID or topic are {@code null}. */ PubSubPublisherClientImpl( final Vertx vertx, - final String projectId, + final PubSubConfigProperties pubSubConfigProperties, final String topic, final CredentialsProvider credentialsProvider) throws ClientErrorException { this.vertx = Objects.requireNonNull(vertx); - Objects.requireNonNull(projectId); Objects.requireNonNull(topic); + Objects.requireNonNull(pubSubConfigProperties); + Objects.requireNonNull(pubSubConfigProperties.getProjectId()); try { - final TopicName topicName = TopicName.of(projectId, topic); - final var builder = Publisher.newBuilder(topicName) - .setEnableMessageOrdering(true); - Optional.ofNullable(credentialsProvider).ifPresent(builder::setCredentialsProvider); + final TopicName topicName = TopicName.of(pubSubConfigProperties.getProjectId(), topic); + final var builder = Publisher.newBuilder(topicName).setEnableMessageOrdering(true); + + if (pubSubConfigProperties.isEmulatorHostConfigured()) { + final var channelProvider = PubSubMessageHelper.getTransportChannelProvider(pubSubConfigProperties); + builder + .setChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()); + } else { + Optional.ofNullable(credentialsProvider).ifPresent(builder::setCredentialsProvider); + } this.publisher = builder.build(); } catch (final IOException e) { this.publisher = null; diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactory.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactory.java index d6e8687cab..da1c0a41d1 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactory.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactory.java @@ -20,6 +20,7 @@ import java.util.function.Supplier; import org.eclipse.hono.client.ServerErrorException; +import org.eclipse.hono.client.pubsub.PubSubConfigProperties; import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import com.google.api.gax.core.CredentialsProvider; @@ -35,25 +36,25 @@ public class CachingPubSubSubscriberFactory implements PubSubSubscriberFactory { private final Vertx vertx; private final Map activeSubscribers = new ConcurrentHashMap<>(); - private final String projectId; + private final PubSubConfigProperties pubSubConfigProperties; private final CredentialsProvider credentialsProvider; private Supplier clientSupplier; /** * Creates a new factory for {@link PubSubSubscriberClient} instances. * - * @param vertx The Vert.x instance that this factory runs on. - * @param projectId The identifier of the Google Cloud Project to connect to. - * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. + * @param vertx The Vert.x instance that this factory runs on. + * @param pubSubConfigProperties The Pub/Sub configuration properties. + * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. * @throws NullPointerException If any of the parameters is {@code null}. */ public CachingPubSubSubscriberFactory( final Vertx vertx, - final String projectId, - final CredentialsProvider credentialsProvider) { + final PubSubConfigProperties pubSubConfigProperties, final CredentialsProvider credentialsProvider) { this.vertx = Objects.requireNonNull(vertx); - this.projectId = Objects.requireNonNull(projectId); + this.pubSubConfigProperties = Objects.requireNonNull(pubSubConfigProperties); this.credentialsProvider = Objects.requireNonNull(credentialsProvider); + Objects.requireNonNull(pubSubConfigProperties.getProjectId()); } /** @@ -99,7 +100,7 @@ private PubSubSubscriberClient createPubSubSubscriber(final String subscriptionI final MessageReceiver receiver) { return Optional.ofNullable(clientSupplier) .map(Supplier::get) - .orElseGet(() -> new PubSubSubscriberClientImpl(vertx, projectId, subscriptionId, receiver, credentialsProvider)); + .orElseGet(() -> new PubSubSubscriberClientImpl(vertx, pubSubConfigProperties, subscriptionId, receiver, credentialsProvider)); } private Future removeSubscriber(final String subscriptionId) { diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/PubSubSubscriberClientImpl.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/PubSubSubscriberClientImpl.java index 207ecf1f0a..6d0ef4d5ab 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/PubSubSubscriberClientImpl.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/PubSubSubscriberClientImpl.java @@ -14,10 +14,13 @@ import java.util.Objects; +import org.eclipse.hono.client.pubsub.PubSubConfigProperties; +import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Subscriber; import com.google.pubsub.v1.ProjectSubscriptionName; @@ -38,16 +41,16 @@ public class PubSubSubscriberClientImpl implements PubSubSubscriberClient { * The number of milliseconds to wait before retrying to subscribe to a subscription. */ private static final int SUBSCRIBE_RETRY_DELAY_MILLIS = 60000; - private final Logger log = LoggerFactory.getLogger(PubSubSubscriberClientImpl.class); - private final Subscriber subscriber; + private static final Logger LOG = LoggerFactory.getLogger(PubSubSubscriberClientImpl.class); private final Vertx vertx; + private final Subscriber subscriber; /** * Creates a new instance of PubSubSubscriberClientImpl where a Pub/Sub Subscriber is initialized. The Subscriber is * based on a created subscription, which follows the format: projects/{project}/subscriptions/{subscription} * * @param vertx The Vert.x instance that this subscriber runs on. - * @param projectId The identifier of the Google Cloud Project to connect to. + * @param pubSubConfigProperties The Pub/Sub configuration properties. * @param subscriptionId The name of the subscription to create the subscriber for. * @param receiver The message receiver used to process the received message. * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. @@ -55,21 +58,31 @@ public class PubSubSubscriberClientImpl implements PubSubSubscriberClient { */ public PubSubSubscriberClientImpl( final Vertx vertx, - final String projectId, + final PubSubConfigProperties pubSubConfigProperties, final String subscriptionId, final MessageReceiver receiver, final CredentialsProvider credentialsProvider) { this.vertx = Objects.requireNonNull(vertx); - Objects.requireNonNull(projectId); Objects.requireNonNull(subscriptionId); Objects.requireNonNull(receiver); Objects.requireNonNull(credentialsProvider); + Objects.requireNonNull(pubSubConfigProperties); + Objects.requireNonNull(pubSubConfigProperties.getProjectId()); + + final ProjectSubscriptionName subscriptionName = ProjectSubscriptionName + .of(pubSubConfigProperties.getProjectId(), subscriptionId); + final var builder = Subscriber.newBuilder(subscriptionName, receiver); - final ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); - this.subscriber = Subscriber - .newBuilder(subscriptionName, receiver) - .setCredentialsProvider(credentialsProvider) - .build(); + if (pubSubConfigProperties.isEmulatorHostConfigured()) { + final var channelProvider = PubSubMessageHelper.getTransportChannelProvider(pubSubConfigProperties); + builder + .setChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()); + } else { + builder.setCredentialsProvider(credentialsProvider); + + } + this.subscriber = builder.build(); } /** @@ -88,15 +101,15 @@ public Future subscribe(final boolean keepTrying) { private void subscribeWithRetries(final Promise resultPromise, final boolean keepTrying) { try { subscriber.startAsync().awaitRunning(); - log.info("Successfully subscribing on: {}", subscriber.getSubscriptionNameString()); + LOG.info("Successfully subscribing on: {}", subscriber.getSubscriptionNameString()); resultPromise.complete(); } catch (Exception e) { if (keepTrying) { - log.info("Error subscribing message from Pub/Sub, will retry in {}ms: ", SUBSCRIBE_RETRY_DELAY_MILLIS, + LOG.info("Error subscribing message from Pub/Sub, will retry in {}ms: ", SUBSCRIBE_RETRY_DELAY_MILLIS, e); vertx.setTimer(SUBSCRIBE_RETRY_DELAY_MILLIS, tid -> subscribeWithRetries(resultPromise, keepTrying)); } else { - log.error("Error subscribing message from Pub/Sub", e); + LOG.error("Error subscribing message from Pub/Sub", e); resultPromise.fail(e); } } diff --git a/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/publisher/CachingPubSubPublisherFactoryTest.java b/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/publisher/CachingPubSubPublisherFactoryTest.java index 8a2865ac41..5e0bdfd642 100644 --- a/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/publisher/CachingPubSubPublisherFactoryTest.java +++ b/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/publisher/CachingPubSubPublisherFactoryTest.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2022 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -13,11 +13,13 @@ package org.eclipse.hono.client.pubsub.publisher; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static com.google.common.truth.Truth.assertThat; import java.util.Optional; +import org.eclipse.hono.client.pubsub.PubSubConfigProperties; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -47,7 +49,9 @@ void setUp() { topic = String.format("%s.%s", TENANT_ID, TOPIC_NAME); client = mock(PubSubPublisherClient.class); final CredentialsProvider credentialsProvider = mock(CredentialsProvider.class); - factory = new CachingPubSubPublisherFactory(vertx, PROJECT_ID, credentialsProvider); + final PubSubConfigProperties pubSubConfigProperties = mock(PubSubConfigProperties.class); + when(pubSubConfigProperties.getProjectId()).thenReturn(PROJECT_ID); + factory = new CachingPubSubPublisherFactory(vertx, pubSubConfigProperties, credentialsProvider); factory.setClientSupplier(() -> client); } diff --git a/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactoryTest.java b/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactoryTest.java index 5c57bdc199..463a05124a 100644 --- a/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactoryTest.java +++ b/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactoryTest.java @@ -13,11 +13,13 @@ package org.eclipse.hono.client.pubsub.subscriber; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static com.google.common.truth.Truth.assertThat; import java.util.Optional; +import org.eclipse.hono.client.pubsub.PubSubConfigProperties; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -46,10 +48,12 @@ public class CachingPubSubSubscriberFactoryTest { void setUp() { final Vertx vertx = mock(Vertx.class); final CredentialsProvider credentialsProvider = mock(CredentialsProvider.class); + final PubSubConfigProperties pubSubConfigProperties = mock(PubSubConfigProperties.class); + when(pubSubConfigProperties.getProjectId()).thenReturn(PROJECT_ID); topic = String.format("%s.%s", TENANT_ID, TOPIC_NAME); receiver = mock(MessageReceiver.class); client = mock(PubSubSubscriberClient.class); - factory = new CachingPubSubSubscriberFactory(vertx, PROJECT_ID, credentialsProvider); + factory = new CachingPubSubSubscriberFactory(vertx, pubSubConfigProperties, credentialsProvider); factory.setClientSupplier(() -> client); } diff --git a/service-base/src/main/java/org/eclipse/hono/service/NotificationSupportingServiceApplication.java b/service-base/src/main/java/org/eclipse/hono/service/NotificationSupportingServiceApplication.java index cd3c064ab6..f18cdfb10a 100644 --- a/service-base/src/main/java/org/eclipse/hono/service/NotificationSupportingServiceApplication.java +++ b/service-base/src/main/java/org/eclipse/hono/service/NotificationSupportingServiceApplication.java @@ -46,7 +46,8 @@ public abstract class NotificationSupportingServiceApplication extends AbstractS * @param amqpNotificationConfig The AMQP 1.0 connection properties. * @param pubSubConfigProperties The Pub/Sub connection properties. * @return the receiver. - * @throws IllegalStateException if both AMQP and Kafka based messaging have been disabled explicitly. + * @throws IllegalStateException if both AMQP and Kafka based messaging have been disabled and pubsub is not set up + * properly. */ protected NotificationReceiver notificationReceiver( final NotificationKafkaConsumerConfigProperties kafkaNotificationConfig, @@ -61,13 +62,13 @@ protected NotificationReceiver notificationReceiver( notificationReceiver = new ProtonBasedNotificationReceiver( HonoConnection.newConnection(vertx, notificationConfig, tracer)); } else { - final Optional credentialsProvider = PubSubMessageHelper.getCredentialsProvider(); + final Optional credentialsProvider = PubSubMessageHelper.getCredentialsProvider(pubSubConfigProperties); if (!appConfig.isPubSubMessagingDisabled() && pubSubConfigProperties.isProjectIdConfigured() && credentialsProvider.isPresent()) { final var factory = new CachingPubSubSubscriberFactory( vertx, - pubSubConfigProperties.getProjectId(), - credentialsProvider.get()); + pubSubConfigProperties, credentialsProvider.get() + ); notificationReceiver = new PubSubBasedNotificationReceiver(factory); } else { throw new IllegalStateException("at least one of Kafka, AMQP or Pub/Sub messaging must be configured"); diff --git a/services/command-router-base/src/main/java/org/eclipse/hono/commandrouter/app/AbstractApplication.java b/services/command-router-base/src/main/java/org/eclipse/hono/commandrouter/app/AbstractApplication.java index bb8d5fca50..ae81eefa25 100644 --- a/services/command-router-base/src/main/java/org/eclipse/hono/commandrouter/app/AbstractApplication.java +++ b/services/command-router-base/src/main/java/org/eclipse/hono/commandrouter/app/AbstractApplication.java @@ -34,7 +34,7 @@ import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties; import org.eclipse.hono.client.pubsub.PubSubConfigProperties; import org.eclipse.hono.client.pubsub.PubSubMessageHelper; -import org.eclipse.hono.client.pubsub.PubSubPublisherOptions; +import org.eclipse.hono.client.pubsub.PubSubQuarkusOptions; import org.eclipse.hono.client.pubsub.publisher.CachingPubSubPublisherFactory; import org.eclipse.hono.client.pubsub.subscriber.CachingPubSubSubscriberFactory; import org.eclipse.hono.client.registry.DeviceRegistrationClient; @@ -133,8 +133,8 @@ public abstract class AbstractApplication extends NotificationSupportingServiceA private PubSubConfigProperties pubSubConfigProperties; @Inject - void setPubSubClientOptions(final PubSubPublisherOptions options) { - this.pubSubConfigProperties = new PubSubConfigProperties(options); + void setPubSubClientOptions(final PubSubQuarkusOptions pubSubQuarkusOptions) { + this.pubSubConfigProperties = new PubSubConfigProperties(pubSubQuarkusOptions); } @Inject @@ -305,8 +305,8 @@ protected void doStart() { amqpServerDeploymentTracker, notificationReceiverTracker, topicCleanUpServiceDeploymentTracker) - .map(deploymentResult) - .onComplete(deploymentCheck); + .map(deploymentResult) + .onComplete(deploymentCheck); } private CommandRouterAmqpServer amqpServer() { @@ -411,17 +411,17 @@ private MessagingClientProvider commandConsumerFactoryPr SendMessageSampler.Factory.noop())); } if (!appConfig.isPubSubMessagingDisabled() && pubSubConfigProperties.isProjectIdConfigured()) { - PubSubMessageHelper.getCredentialsProvider() + PubSubMessageHelper.getCredentialsProvider(pubSubConfigProperties) .ifPresentOrElse(provider -> { LOG.debug("Configuring Pub/Sub based command consumer factory"); final var publisherFactory = new CachingPubSubPublisherFactory( vertx, - pubSubConfigProperties.getProjectId(), + pubSubConfigProperties, provider); final var subscriberFactory = new CachingPubSubSubscriberFactory( vertx, - pubSubConfigProperties.getProjectId(), + pubSubConfigProperties, provider); commandConsumerFactoryProvider.setClient(new PubSubBasedCommandConsumerFactoryImpl( vertx, @@ -460,11 +460,11 @@ private MessagingClientProvider eventSenderProvider() { false)); } if (!appConfig.isPubSubMessagingDisabled() && pubSubConfigProperties.isProjectIdConfigured()) { - PubSubMessageHelper.getCredentialsProvider() + PubSubMessageHelper.getCredentialsProvider(pubSubConfigProperties) .ifPresentOrElse(provider -> { final var pubSubFactory = new CachingPubSubPublisherFactory( vertx, - pubSubConfigProperties.getProjectId(), + pubSubConfigProperties, provider); eventSenderProvider.setClient(new PubSubBasedDownstreamSender( diff --git a/services/command-router-infinispan/src/main/resources/application-dev.properties b/services/command-router-infinispan/src/main/resources/application-dev.properties index c0f42934df..213d271f8e 100644 --- a/services/command-router-infinispan/src/main/resources/application-dev.properties +++ b/services/command-router-infinispan/src/main/resources/application-dev.properties @@ -7,7 +7,7 @@ hono.commandRouter.amqp.insecurePortEnabled=true hono.commandRouter.amqp.insecurePortBindAddress=0.0.0.0 hono.commandRouter.amqp.insecurePort=${fixed.commandrouter.amqp.port} -hono.commandRouter.cache.remote.serverList=${docker.host.address:localhost}:${hono.infinispan.port} +hono.commandRouter.cache.remote.serverList=${docker.host.address:localhost}:${hono.infinispan.port:11222} hono.commandRouter.cache.remote.authServerName=${hono.infinispan.host:localhost} hono.commandRouter.cache.remote.authUsername=${hono.infinispan.username} hono.commandRouter.cache.remote.authPassword=${hono.infinispan.password} @@ -24,5 +24,3 @@ hono.kafka.commandResponse.producerConfig."max.block.ms"=${kafka-client.producer hono.kafka.commandResponse.producerConfig."request.timeout.ms"=${kafka-client.producer.request-timeout-ms} hono.kafka.event.producerConfig."max.block.ms"=${kafka-client.producer.max-block-ms} hono.kafka.event.producerConfig."request.timeout.ms"=${kafka-client.producer.request-timeout-ms} -hono.kafka.notification.producerConfig."max.block.ms"=${kafka-client.producer.max-block-ms} -hono.kafka.notification.producerConfig."request.timeout.ms"=${kafka-client.producer.request-timeout-ms} diff --git a/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/AbstractAmqpServerFactory.java b/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/AbstractAmqpServerFactory.java index 72c439c55c..f6677ef037 100644 --- a/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/AbstractAmqpServerFactory.java +++ b/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/AbstractAmqpServerFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2022 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -263,12 +263,12 @@ private MessagingClientProvider eventSenderProvider() { result.setClient(new KafkaBasedEventSender(vertx, factory, eventKafkaProducerConfig, true, tracer)); } if (!appConfig.isPubSubMessagingDisabled() && pubSubConfigProperties.isProjectIdConfigured()) { - PubSubMessageHelper.getCredentialsProvider() + PubSubMessageHelper.getCredentialsProvider(pubSubConfigProperties) .ifPresent(provider -> { final var factory = new CachingPubSubPublisherFactory( - vertx, - pubSubConfigProperties.getProjectId(), - provider); + vertx, + pubSubConfigProperties, + provider); result.setClient(new PubSubBasedDownstreamSender( vertx, factory, diff --git a/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/NotificationSenderProducer.java b/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/NotificationSenderProducer.java index 2c6075246b..69c4e564fe 100644 --- a/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/NotificationSenderProducer.java +++ b/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/NotificationSenderProducer.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2022 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -84,12 +84,13 @@ NotificationSender notificationSender( downstreamSenderConfig, tracer)); } else { - final Optional credentialsProvider = PubSubMessageHelper.getCredentialsProvider(); + final Optional credentialsProvider = PubSubMessageHelper + .getCredentialsProvider(pubSubConfigProperties); if (!appConfig.isPubSubMessagingDisabled() && pubSubConfigProperties.isProjectIdConfigured() && credentialsProvider.isPresent()) { final PubSubPublisherFactory factory = new CachingPubSubPublisherFactory( vertx, - pubSubConfigProperties.getProjectId(), + pubSubConfigProperties, credentialsProvider.get()); notificationSender = new PubSubBasedNotificationSender( factory, diff --git a/test-utils/pubsub-test-util/README.md b/test-utils/pubsub-test-util/README.md new file mode 100644 index 0000000000..f0433c7fcd --- /dev/null +++ b/test-utils/pubsub-test-util/README.md @@ -0,0 +1,4 @@ +Run this to set up topics and subscriptions on the local Google Cloud PubSub Emulator + + +More details on the emulator and local PubSub development are located [here](../../clients/pubsub-common/readme.md) \ No newline at end of file diff --git a/test-utils/pubsub-test-util/pom.xml b/test-utils/pubsub-test-util/pom.xml new file mode 100644 index 0000000000..f01a9cacfa --- /dev/null +++ b/test-utils/pubsub-test-util/pom.xml @@ -0,0 +1,46 @@ + + + + 4.0.0 + + org.eclipse.hono + test-utils + 2.7.0-SNAPSHOT + + pubsub-test-util + + Hono Google Pub Sub Emulator Test Utils + Helper classes for testing locally with the pubsub emulator + https://www.eclipse.org/hono + + + + org.eclipse.hono + hono-client-pubsub-common + + + io.quarkiverse.googlecloudservices + quarkus-google-cloud-pubsub + ${quarkiverse.googlecloudservices.version} + + + commons-logging + commons-logging + + + + + + diff --git a/test-utils/pubsub-test-util/src/main/java/org/eclipse/hono/test/pubsub/PubSubEmulatorSetup.java b/test-utils/pubsub-test-util/src/main/java/org/eclipse/hono/test/pubsub/PubSubEmulatorSetup.java new file mode 100644 index 0000000000..1fa9d16f3f --- /dev/null +++ b/test-utils/pubsub-test-util/src/main/java/org/eclipse/hono/test/pubsub/PubSubEmulatorSetup.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ + +package org.eclipse.hono.test.pubsub; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager; +import org.eclipse.hono.client.pubsub.PubSubConfigProperties; +import org.eclipse.hono.client.pubsub.PubSubQuarkusOptions; + +import com.google.api.gax.core.NoCredentialsProvider; + +import io.vertx.core.Vertx; + +/** + * A class to set up topics and subscription for local pubsub emulator. + */ +public class PubSubEmulatorSetup { + + private PubSubEmulatorSetup() { + + } + + /** + * main used to set up the topics / subscription in the emulator. + * @param args - + * @throws IOException - + */ + public static void main(final String[] args) throws IOException { + final String projectId = "local-test-project"; + final String hostPort = "localhost:8085"; // Emulator address + final NoCredentialsProvider credentialsProvider = NoCredentialsProvider.create(); + final var pubSubQuarkusOptions = new PubSubQuarkusOptions() { + @Override + public Optional projectId() { + return Optional.of(projectId); + } + + @Override + public PubSubQuarkusOptions.PubSubConfig pubsub() { + return () -> Optional.of(hostPort); + } + }; + final var vertx = Vertx.vertx(); + // Use try-with-resources to automatically close the clients + final PubSubBasedAdminClientManager clientManager = new PubSubBasedAdminClientManager(new PubSubConfigProperties(pubSubQuarkusOptions), credentialsProvider, vertx); + try { + clientManager.getOrCreateTopic("notification", "registry-tenant"); + clientManager.getOrCreateTopic("notification", "registry-device"); + clientManager.getOrCreateSubscription("notification", "registry-tenant"); + clientManager.getOrCreateSubscription("notification", "registry-device"); + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + System.out.println((e.getMessage())); + } + try { + clientManager.closeAdminClients(); + TimeUnit.SECONDS.sleep(1); + vertx.close(); + TimeUnit.SECONDS.sleep(1); + } catch (Exception e) { + System.out.println((e.getMessage())); + } + } +} diff --git a/tests/pom.xml b/tests/pom.xml index e6ce218bb8..32ac45f9b2 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -31,6 +31,7 @@ https://www.eclipse.org/hono + ${project.version} true true true @@ -431,7 +432,7 @@ CI - @@ -734,7 +735,7 @@ false - + local @@ -853,7 +854,7 @@ - ${docker.repository}/hono-infinispan-test:${project.version} + ${docker.repository}/hono-infinispan-test:${container.version} ${hono.infinispan.disabled} IfNotPresent @@ -913,7 +914,7 @@ - ${docker.repository}/hono-jaeger-test:${project.version} + ${docker.repository}/hono-jaeger-test:${container.version} ${jaeger.disabled} IfNotPresent @@ -978,11 +979,11 @@ - ${docker.repository}/hono-service-auth-test:${project.version} + ${docker.repository}/hono-service-auth-test:${container.version} ${hono.service-auth.disabled} IfNotPresent - ${docker.repository}/${hono.auth-server.image}:${project.version} + ${docker.repository}/${hono.auth-server.image}:${container.version} dir / @@ -1071,7 +1072,7 @@ - ${docker.repository}/hono-artemis-test:${project.version} + ${docker.repository}/hono-artemis-test:${container.version} ${hono.amqp-messaging.disabled} IfNotPresent @@ -1136,7 +1137,7 @@ - ${docker.repository}/hono-dispatch-router-test:${project.version} + ${docker.repository}/hono-dispatch-router-test:${container.version} ${hono.amqp-messaging.disabled} IfNotPresent @@ -1255,7 +1256,7 @@ - ${docker.repository}/hono-mongodb-test:${project.version} + ${docker.repository}/hono-mongodb-test:${container.version} ${hono.mongodb.disabled} IfNotPresent @@ -1310,11 +1311,11 @@ - ${docker.repository}/hono-service-device-registry-test:${project.version} + ${docker.repository}/hono-service-device-registry-test:${container.version} ${hono.deviceregistry.mongodb.disabled} IfNotPresent - ${docker.repository}/${hono.deviceregistry.mongodb.image}:${project.version} + ${docker.repository}/${hono.deviceregistry.mongodb.image}:${container.version} dir / @@ -1416,11 +1417,11 @@ - ${docker.repository}/hono-service-device-registry-test:${project.version} + ${docker.repository}/hono-service-device-registry-test:${container.version} ${hono.deviceregistry.jdbc.disabled} IfNotPresent - ${docker.repository}/${hono.deviceregistry.jdbc.image}:${project.version} + ${docker.repository}/${hono.deviceregistry.jdbc.image}:${container.version} dir / @@ -1493,11 +1494,11 @@ - ${docker.repository}/hono-service-command-router-test:${project.version} + ${docker.repository}/hono-service-command-router-test:${container.version} ${hono.command-router.disabled} IfNotPresent - ${docker.repository}/${hono.command-router.image}:${project.version} + ${docker.repository}/${hono.command-router.image}:${container.version} dir / @@ -1574,11 +1575,11 @@ - ${docker.repository}/hono-adapter-http-test:${project.version} + ${docker.repository}/hono-adapter-http-test:${container.version} ${hono.http-adapter.disabled} IfNotPresent - ${docker.repository}/${hono.http-adapter.image}:${project.version} + ${docker.repository}/${hono.http-adapter.image}:${container.version} dir / @@ -1661,11 +1662,11 @@ - ${docker.repository}/hono-adapter-mqtt-test:${project.version} + ${docker.repository}/hono-adapter-mqtt-test:${container.version} ${hono.mqtt-adapter.disabled} IfNotPresent - ${docker.repository}/${hono.mqtt-adapter.image}:${project.version} + ${docker.repository}/${hono.mqtt-adapter.image}:${container.version} dir / @@ -1737,11 +1738,11 @@ - ${docker.repository}/hono-adapter-amqp-test:${project.version} + ${docker.repository}/hono-adapter-amqp-test:${container.version} ${hono.amqp-adapter.disabled} IfNotPresent - ${docker.repository}/${hono.amqp-adapter.image}:${project.version} + ${docker.repository}/${hono.amqp-adapter.image}:${container.version} dir / @@ -1814,11 +1815,11 @@ - ${docker.repository}/hono-adapter-coap-test:${project.version} + ${docker.repository}/hono-adapter-coap-test:${container.version} ${hono.coap-adapter.disabled} IfNotPresent - ${docker.repository}/${hono.coap-adapter.image}:${project.version} + ${docker.repository}/${hono.coap-adapter.image}:${container.version} dir / @@ -1890,11 +1891,11 @@ - ${docker.repository}/hono-adapter-lora-test:${project.version} + ${docker.repository}/hono-adapter-lora-test:${container.version} ${hono.lora-adapter.disabled} IfNotPresent - ${docker.repository}/${hono.lora-adapter.image}:${project.version} + ${docker.repository}/${hono.lora-adapter.image}:${container.version} dir /