diff --git a/antora.yml b/antora.yml index 9e77f227..e255a029 100644 --- a/antora.yml +++ b/antora.yml @@ -17,4 +17,16 @@ asciidoc: starlight-rabbitmq: 'Starlight for RabbitMQ' pulsar-reg: 'Apache Pulsar(TM)' pulsar: 'Apache Pulsar' - pulsar-short: 'Pulsar' \ No newline at end of file + pulsar-short: 'Pulsar' + + # Required for include::common partials that are shared with Astra Streaming + web-ui: 'Admin Console' + astra: 'Astra' + astra-db: 'Astra DB' + scb: 'Secure Connect Bundle (SCB)' + scb-short: 'SCB' + scb-brief: 'Secure Connect Bundle' + cass-reg: 'Apache Cassandra(R)' + cass: 'Apache Cassandra' + cass-short: 'Cassandra' + debezium-version: '1.7' \ No newline at end of file diff --git a/modules/ROOT/nav.adoc b/modules/ROOT/nav.adoc index 155a7a7d..cd44410e 100644 --- a/modules/ROOT/nav.adoc +++ b/modules/ROOT/nav.adoc @@ -19,7 +19,21 @@ * xref:operations:auth.adoc[] * xref:operations:functions.adoc[] * xref:operations:functions-transform.adoc[] -* xref:operations:io-connectors.adoc[] -* xref:operations:io-elastic-sink.adoc[] +* I/O connectors +** xref:connectors:index.adoc[] +** Sinks +*** xref:connectors:sinks/astra-db.adoc[] +*** xref:connectors:sinks/elastic-search.adoc[] +*** xref:connectors:sinks/jdbc-clickhouse.adoc[] +*** xref:connectors:sinks/jdbc-mariadb.adoc[] +*** xref:connectors:sinks/jdbc-postgres.adoc[] +*** xref:connectors:sinks/kafka.adoc[] +*** xref:connectors:sinks/kinesis.adoc[] +** Sources +*** xref:connectors:sources/debezium-mongodb.adoc[] +*** xref:connectors:sources/debezium-mysql.adoc[] +*** xref:connectors:sources/debezium-postgres.adoc[] +*** xref:connectors:sources/kafka.adoc[] +*** xref:connectors:sources/kinesis.adoc[] * xref:operations:scale-cluster.adoc[] * xref:operations:troubleshooting.adoc[] \ No newline at end of file diff --git a/modules/ROOT/pages/faqs.adoc b/modules/ROOT/pages/faqs.adoc index d3641d4a..fbe3bc44 100644 --- a/modules/ROOT/pages/faqs.adoc +++ b/modules/ROOT/pages/faqs.adoc @@ -80,7 +80,7 @@ https://github.com/datastax/pulsar-heartbeat[{pulsar-short} Heartbeat] monitors == What are the features provided by {company} {pulsar} Connector (`pulsar-sink`) that are not supported in `kafka-sink`? -The https://pulsar.apache.org/docs/en/io-overview/[{pulsar-short} IO framework] provides many features that are not possible in Kafka, and has different compression formats and auth/security features. The features are handled by {pulsar-short}. For more, see xref:operations:io-connectors.adoc[Luna Streaming IO Connectors]. +The https://pulsar.apache.org/docs/en/io-overview/[{pulsar-short} IO framework] provides many features that are not possible in Kafka, and has different compression formats and auth/security features. The features are handled by {pulsar-short}. For more, see xref:connectors:index.adoc[Luna Streaming IO Connectors]. The {company} {pulsar} Connector allows single-record acknowledgement and negative acknowledgements. diff --git a/modules/ROOT/pages/index.adoc b/modules/ROOT/pages/index.adoc index 3aa200a6..75857f4a 100644 --- a/modules/ROOT/pages/index.adoc +++ b/modules/ROOT/pages/index.adoc @@ -17,7 +17,7 @@ In addition to the distribution of https://pulsar.apache.org/en/versions/[{pulsa * A xref:install-upgrade:quickstart-helm-installs.adoc[Helm chart] that deploys and manages {pulsar-short} on your current CNCF-conformant Kubernetes infrastructure -* Cassandra, Elastic, Kinesis, Kafka, and JDBC xref:operations:io-connectors.adoc[connectors] +* Cassandra, Elastic, Kinesis, Kafka, and JDBC xref:connectors:index.adoc[connectors] * xref:components:admin-console-vm.adoc[{pulsar-short} Admin Console] for simplified administration of your {pulsar-short} environment diff --git a/modules/ROOT/partials/sinks/edit.adoc b/modules/ROOT/partials/sinks/edit.adoc new file mode 100644 index 00000000..90999bb1 --- /dev/null +++ b/modules/ROOT/partials/sinks/edit.adoc @@ -0,0 +1,17 @@ +To update a connector, pass the new configuration definition to the connector. +For example, if you created the connector with a configuration file, you can pass an updated configuration file. + +You can include the entire configuration or only the properties that you want to change. +Additionally, some properties can be modified with specific arguments, such as `--parallelism`. + +To get the current configuration, see xref:connectors:index.adoc#get-sink-connector-configuration-data[Get sink connector configuration data]. + +[source,shell,subs="+attributes"] +---- +./bin/pulsar-admin sinks update \ + --sink-type {connectorType} \ + --name "$SINK_NAME" \ + --inputs "persistent://$TENANT/$NAMESPACE/$TOPIC" \ + --tenant "$TENANT" \ + --parallelism 2 +---- \ No newline at end of file diff --git a/modules/ROOT/partials/sinks/get-started.adoc b/modules/ROOT/partials/sinks/get-started.adoc new file mode 100644 index 00000000..e4acdf41 --- /dev/null +++ b/modules/ROOT/partials/sinks/get-started.adoc @@ -0,0 +1,32 @@ +. Optional: If you are using the `pulsar-admin` CLI, set the following commonly-used environment variables: ++ +[source,shell,subs="+quotes"] +---- +export TENANT="**TENANT_NAME**" +export TOPIC="**INPUT_TOPIC_NAME**" +export NAMESPACE="**NAMESPACE_NAME**" +export SINK_NAME="**SINK_CONNECTOR_UNIQUE_NAME**" +---- ++ +`**SINK_NAME**` is the name for your new sink connector. +{company} recommends a memorable, human-readable name that summarizes the connector's purpose. +For example: `{connectorType}-sink-prod-us-east-1`. + +. Create the connector using JSON-formatted connector configuration data. +You can pass the configuration directly or with a configuration file. ++ +[source,shell,subs="+attributes"] +---- +./bin/pulsar-admin sinks create \ + --sink-type {connectorType} \ + --name "$SINK_NAME" \ + --inputs "persistent://$TENANT/$NAMESPACE/$TOPIC" \ + --tenant "$TENANT" \ + --sink-config-file configs.json +---- ++ +.Example configuration data structure +[source,json] +---- +include::common:streaming:example$connectors/sinks/{connectorType}/sample-data.json[] +---- \ No newline at end of file diff --git a/modules/ROOT/partials/sources/edit.adoc b/modules/ROOT/partials/sources/edit.adoc new file mode 100644 index 00000000..b51ea3c2 --- /dev/null +++ b/modules/ROOT/partials/sources/edit.adoc @@ -0,0 +1,17 @@ +To update a connector, pass the new configuration definition to the connector. +For example, if you created the connector with a configuration file, you can pass an updated configuration file. + +You can include the entire configuration or only the properties that you want to change. +Additionally, some properties can be modified with specific arguments, such as `--parallelism`. + +To get the current configuration, see xref:connectors:index.adoc#get-source-connector-configuration-data[Get source connector configuration data]. + +[source,shell,subs="+attributes"] +---- +./bin/pulsar-admin sources update \ + --source-type {connectorType} \ + --name "$SOURCE_NAME" \ + --destination-topic-name "persistent://$TENANT/$NAMESPACE/$TOPIC" \ + --tenant "$TENANT" \ + --parallelism 2 +---- \ No newline at end of file diff --git a/modules/ROOT/partials/sources/get-started.adoc b/modules/ROOT/partials/sources/get-started.adoc new file mode 100644 index 00000000..fb4cd25c --- /dev/null +++ b/modules/ROOT/partials/sources/get-started.adoc @@ -0,0 +1,32 @@ +. Optional: If you are using the `pulsar-admin` CLI, set the following commonly-used environment variables: ++ +[source,shell,subs="+quotes"] +---- +export TENANT="**TENANT_NAME**" +export TOPIC="**OUTPUT_TOPIC_NAME**" +export NAMESPACE="**NAMESPACE_NAME**" +export SOURCE_NAME="**SOURCE_CONNECTOR_UNIQUE_NAME**" +---- ++ +`**SOURCE_NAME**` is the name for your new source connector. +{company} recommends a memorable, human-readable name that summarizes the connector's purpose. +For example: `{connectorType}-source-prod-us-east-1`. + +. Create the connector using JSON-formatted connector configuration data. +You can pass the configuration directly or with a configuration file. ++ +[source,shell,subs="+attributes"] +---- +./bin/pulsar-admin sources create \ + --source-type {connectorType} \ + --name "$SOURCE_NAME" \ + --destination-topic-name "persistent://$TENANT/$NAMESPACE/$TOPIC" \ + --tenant "$TENANT" \ + --source-config-file configs.json +---- ++ +.Example configuration data structure +[source,json] +---- +include::common:streaming:example$connectors/sources/{connectorType}/sample-data.json[] +---- \ No newline at end of file diff --git a/modules/connectors/pages/index.adoc b/modules/connectors/pages/index.adoc new file mode 100644 index 00000000..94a1ea2e --- /dev/null +++ b/modules/connectors/pages/index.adoc @@ -0,0 +1,224 @@ += {pulsar-short} I/O connectors in {product} +:navtitle: {pulsar-short} I/O connectors reference +:page-aliases: operations:io-connectors.adoc + +{product} offers fully-managed versions of https://pulsar.apache.org/docs/en/io-overview/[{pulsar-reg} sink and source connectors]. + +[TIP] +==== +There are three versions of the {company} Luna Streaming distribution. +The `lunastreaming-all` version includes all connectors. +==== + +[#sink-connectors] +== Sink connectors + +Sink connectors export messages from {pulsar-short} topics to external services. + +Supported sink connectors:: +The following sink connectors are included in {product}: ++ +* xref:connectors:sinks/astra-db.adoc[{astra-db} and {cass} sink] +* xref:connectors:sinks/kafka.adoc[Apache Kafka sink] +* xref:connectors:sinks/kinesis.adoc[AWS Kinesis sink] +* xref:connectors:sinks/elastic-search.adoc[Elasticsearch sink] +* xref:connectors:sinks/jdbc-clickhouse.adoc[JDBC ClickHouse sink] +* xref:connectors:sinks/jdbc-mariadb.adoc[JDBC MariaDB sink] +* xref:connectors:sinks/jdbc-postgres.adoc[JDBC PostgreSQL sink] + +[#source-connectors] +== Source connectors + +Source connectors ingest messages from external services into {pulsar-short} topics. + +Supported source connectors:: +The following source connectors are included in {product}: ++ +* xref:connectors:sources/kafka.adoc[Apache Kafka source] +* xref:connectors:sources/kinesis.adoc[AWS Kinesis source] +* xref:connectors:sources/debezium-mongodb.adoc[Debezium MongoDB source] +* xref:connectors:sources/debezium-mysql.adoc[Debezium MySQL source] +* xref:connectors:sources/debezium-postgres.adoc[Debezium PostgreSQL source] + +[#create-and-manage-connectors] +== Create and manage connectors + +You can use the {web-ui} and the `xref:components:admin-console-tutorial.adoc[pulsar-admin]` CLI to create, monitor, and manage sink and source connectors. + +Although you use the same base commands to create and update all {product} {pulsar-short} connectors, each connector has different configuration options. +For example commands and configuration details, see the documentation for your preferred <> and <>. + +=== `pulsar-admin` CLI sink operations + +Get available sink connectors:: +Get a list of sink connectors that are available in your {product} {pulsar-short} tenant: ++ +[source,shell] +---- +./bin/pulsar-admin sinks available-sinks +---- + +[#get-sink-connector-configuration-data] +Get sink connector configuration data:: +Get the configuration for an existing sink connector: ++ +[source,shell] +---- +# Get information about a connector +./bin/pulsar-admin sinks get \ + --namespace "$NAMESPACE" \ + --name "$SINK_NAME" \ + --tenant "$TENANT" +---- + +Start a sink connector:: ++ +[source,shell] +---- +# Start all instances of a connector +./bin/pulsar-admin sinks start \ + --namespace "$NAMESPACE" \ + --name "$SINK_NAME" \ + --tenant "$TENANT" + +# optionally add --instance-id to only start an individual instance +---- + +Stop a sink connector:: ++ +[source,shell] +---- +# Stop all instances of a connector +./bin/pulsar-admin sinks stop \ + --namespace "$NAMESPACE" \ + --name "$SINK_NAME" \ + --tenant "$TENANT" + +# optionally add --instance-id to only stop an individual instance +---- + +Restart a sink connector:: ++ +[source,shell] +---- +# Restart all instances of a connector +./bin/pulsar-admin sinks restart \ + --namespace "$NAMESPACE" \ + --name "$SINK_NAME" \ + --tenant "$TENANT" + +# optionally add --instance-id to only restart an individual instance +---- + +Get sink connector status:: ++ +[source,shell] +---- +# Check connector status +./bin/pulsar-admin sinks status \ + --instance-id "$SINK_INSTANCE_ID" \ + --namespace "$NAMESPACE" \ + --name "$SINK_NAME" \ + --tenant "$TENANT" +---- + +Delete a sink connector:: ++ +[source,shell] +---- +# Delete all instances of a connector +./bin/pulsar-admin sinks delete \ + --namespace "$NAMESPACE" \ + --name "$SINK_NAME" \ + --tenant "$TENANT" +---- + +=== `pulsar-admin` CLI source operations + +Get available source connectors:: +Get a list of source connectors that are available in your {product} {pulsar-short} tenant: ++ +[source,shell] +---- +./bin/pulsar-admin sources available-sources +---- + +[#get-source-connector-configuration-data] +Get source connector configuration data:: +Get the configuration for an existing source connector: ++ +[source,shell] +---- +# Get information about connector +./bin/pulsar-admin sources get \ + --namespace "$NAMESPACE" \ + --name "$SOURCE_NAME" \ + --tenant "$TENANT" +---- + +Start a source connector:: ++ +[source,shell] +---- +# Start all instances of a connector +./bin/pulsar-admin sources start \ + --namespace "$NAMESPACE" \ + --name "$SOURCE_NAME" \ + --tenant "$TENANT" + +# optionally add --instance-id to only start an individual instance +---- + +Stop a source connector:: ++ +[source,shell] +---- +# Stop all instances of a connector +./bin/pulsar-admin sources stop \ + --namespace "$NAMESPACE" \ + --name "$SOURCE_NAME" \ + --tenant "$TENANT" + +# optionally add --instance-id to only stop an individual instance +---- + +Restart a source connector:: ++ +[source,shell] +---- +# Restart all instances of a connector +./bin/pulsar-admin sources restart \ + --namespace "$NAMESPACE" \ + --name "$SOURCE_NAME" \ + --tenant "$TENANT" + +# optionally add --instance-id to only restart an individual instance +---- + +Get source connector status:: ++ +[source,shell] +---- +# Check connector status +./bin/pulsar-admin sources status \ + --instance-id "$SOURCE_INSTANCE_ID" \ + --namespace "$NAMESPACE" \ + --name "$SOURCE_NAME" \ + --tenant "$TENANT" +---- + +Delete a source connector:: ++ +[source,shell] +---- +# Delete all instances of a connector +./bin/pulsar-admin sources delete \ + --namespace "$NAMESPACE" \ + --name "$SOURCE_NAME" \ + --tenant "$TENANT" +---- + +== Connector metrics + +{product} exposes Prometheus-formatted metrics for every connector. +For more information, see xref:components:pulsar-monitor.adoc[]. \ No newline at end of file diff --git a/modules/connectors/pages/sinks/astra-db.adoc b/modules/connectors/pages/sinks/astra-db.adoc new file mode 100644 index 00000000..b98105ed --- /dev/null +++ b/modules/connectors/pages/sinks/astra-db.adoc @@ -0,0 +1,47 @@ += {astra-db} and {cass} +:connectorType: cassandra-enhanced + +include::common:streaming:partial$connectors/sinks/astra-db-intro.adoc[] + +== Create the connector + +include::ROOT:partial$sinks/get-started.adoc[] + +[TIP] +==== +The preceding example connects to a self-managed {cass-short} cluster. + +For an {astra-db} example, see <>. + +For all connection properties, see <>. +==== + +== Edit the connector + +include::ROOT:partial$sinks/edit.adoc[] + +== Manage the connector + +include::common:streaming:partial$connectors/manage.adoc[] + +== Connector configuration reference + +include::common:streaming:partial$connectors/connector-params-intro.adoc[] + +=== {pulsar-short} sink connector properties + +include::common:streaming:partial$connectors/sinks/pulsar-config-params.adoc[] + +[#configs] +=== {company} {pulsar} sink connector properties (`configs`) + +include::common:streaming:partial$connectors/sinks/astra-db-properties.adoc[] + +[#topic] +=== Topic-to-table mapping properties (`topic`) + +include::common:streaming:partial$connectors/sinks/astra-db-topic.adoc[] + +== See also + +* https://github.com/datastax/pulsar-sink[{company} {pulsar} connector GitHub repository] \ No newline at end of file diff --git a/modules/connectors/pages/sinks/elastic-search.adoc b/modules/connectors/pages/sinks/elastic-search.adoc new file mode 100644 index 00000000..4a437f66 --- /dev/null +++ b/modules/connectors/pages/sinks/elastic-search.adoc @@ -0,0 +1,38 @@ += Elasticsearch +:connectorType: elastic_search +:page-aliases: operations:io-elastic-sink.adoc + +The Elasticsearch sink connector reads messages from {pulsar-short} topics and writes them to https://www.elastic.co/elasticsearch/[Elasticsearch]. + +[#compatibility] +== Compatibility + +include::common:streaming:partial$connectors/sinks/elastic-search-compatibility.adoc[] + +== Create the connector + +include::ROOT:partial$sinks/get-started.adoc[] + +== Edit the connector + +include::ROOT:partial$sinks/edit.adoc[] + +== Manage the connector + +include::common:streaming:partial$connectors/manage.adoc[] + +== Connector configuration reference + +include::common:streaming:partial$connectors/connector-params-intro.adoc[] + +=== {pulsar-short} sink connector properties + +include::common:streaming:partial$connectors/sinks/pulsar-config-params.adoc[] + +[#configs] +=== Elasticsearch sink connector properties (`configs`) + +Set these properties in the `configs` section of the connector configuration. + +Generally, all properties provided in the https://pulsar.apache.org/docs/io-elasticsearch-sink[OSS {pulsar} Elasticsearch sink connector] are supported. +Exceptions include properties that aren't relevant to {product} and properties that are only present in <>. \ No newline at end of file diff --git a/modules/connectors/pages/sinks/jdbc-clickhouse.adoc b/modules/connectors/pages/sinks/jdbc-clickhouse.adoc new file mode 100644 index 00000000..c7935fff --- /dev/null +++ b/modules/connectors/pages/sinks/jdbc-clickhouse.adoc @@ -0,0 +1,34 @@ += JDBC ClickHouse +:connectorType: jdbc-clickhouse + +You can use the JDBC ClickHouse sink connector to stream data from {pulsar-short} topics into https://clickhouse.com/[ClickHouse] tables. + +[#compatibility] +== Compatibility + +include::common:streaming:partial$connectors/sinks/jdbc-clickhouse-compatibility.adoc[] + +== Create the connector + +include::ROOT:partial$sinks/get-started.adoc[] + +== Edit the connector + +include::ROOT:partial$sinks/edit.adoc[] + +== Manage the connector + +include::common:streaming:partial$connectors/manage.adoc[] + +== Connector configuration reference + +include::common:streaming:partial$connectors/connector-params-intro.adoc[] + +=== {pulsar-short} sink connector properties + +include::common:streaming:partial$connectors/sinks/pulsar-config-params.adoc[] + +[#configs] +=== JDBC ClickHouse sink connector properties (`configs`) + +include::common:streaming:partial$connectors/sinks/jdbc-config-params.adoc[] \ No newline at end of file diff --git a/modules/connectors/pages/sinks/jdbc-mariadb.adoc b/modules/connectors/pages/sinks/jdbc-mariadb.adoc new file mode 100644 index 00000000..6530017d --- /dev/null +++ b/modules/connectors/pages/sinks/jdbc-mariadb.adoc @@ -0,0 +1,34 @@ += JDBC MariaDB +:connectorType: jdbc-mariadb + +You can use the JDBC MariaDB sink connector to stream data from {pulsar-short} topics into https://mariadb.org/[MariaDB] tables. + +[#compatibility] +== Compatibility + +include::common:streaming:partial$connectors/sinks/jdbc-mariadb-compatibility.adoc[] + +== Create the connector + +include::ROOT:partial$sinks/get-started.adoc[] + +== Edit the connector + +include::ROOT:partial$sinks/edit.adoc[] + +== Manage the connector + +include::common:streaming:partial$connectors/manage.adoc[] + +== Connector configuration reference + +include::common:streaming:partial$connectors/connector-params-intro.adoc[] + +=== {pulsar-short} sink connector properties + +include::common:streaming:partial$connectors/sinks/pulsar-config-params.adoc[] + +[#configs] +=== JDBC MariaDB sink connector properties (`configs`) + +include::common:streaming:partial$connectors/sinks/jdbc-config-params.adoc[] \ No newline at end of file diff --git a/modules/connectors/pages/sinks/jdbc-postgres.adoc b/modules/connectors/pages/sinks/jdbc-postgres.adoc new file mode 100644 index 00000000..447ce52b --- /dev/null +++ b/modules/connectors/pages/sinks/jdbc-postgres.adoc @@ -0,0 +1,34 @@ += JDBC PostgreSQL +:connectorType: jdbc-postgres + +You can use the JDBC PostgreSQL sink connector to stream data from {pulsar-short} topics into https://www.postgresql.org/[PostgreSQL] tables. + +[#compatibility] +== Compatibility + +include::common:streaming:partial$connectors/sinks/jdbc-postgres-compatibility.adoc[] + +== Create the connector + +include::ROOT:partial$sinks/get-started.adoc[] + +== Edit the connector + +include::ROOT:partial$sinks/edit.adoc[] + +== Manage the connector + +include::common:streaming:partial$connectors/manage.adoc[] + +== Connector configuration reference + +include::common:streaming:partial$connectors/connector-params-intro.adoc[] + +=== {pulsar-short} sink connector properties + +include::common:streaming:partial$connectors/sinks/pulsar-config-params.adoc[] + +[#configs] +=== JDBC PostgreSQL sink connector properties (`configs`) + +include::common:streaming:partial$connectors/sinks/jdbc-config-params.adoc[] \ No newline at end of file diff --git a/modules/connectors/pages/sinks/kafka.adoc b/modules/connectors/pages/sinks/kafka.adoc new file mode 100644 index 00000000..1586b92c --- /dev/null +++ b/modules/connectors/pages/sinks/kafka.adoc @@ -0,0 +1,37 @@ += Kafka +:connectorType: kafka + +The Kafka sink connector reads messages from {pulsar-short} topics and writes them to https://kafka.apache.org/[Kafka] topics. + +[#compatibility] +== Compatibility + +{product} supports {pulsar-reg} {pulsar-version}, which uses the https://github.com/apache/kafka/tree/2.7[Kafka 2.7.2 library]. + +== Create the connector + +include::ROOT:partial$sinks/get-started.adoc[] + +== Edit the connector + +include::ROOT:partial$sinks/edit.adoc[] + +== Manage the connector + +include::common:streaming:partial$connectors/manage.adoc[] + +== Connector configuration reference + +include::common:streaming:partial$connectors/connector-params-intro.adoc[] + +=== {pulsar-short} sink connector properties + +include::common:streaming:partial$connectors/sinks/pulsar-config-params.adoc[] + +[#configs] +=== Kafka sink connector properties (`configs`) + +Set these properties in the `configs` section of the connector configuration. + +Generally, all properties provided in the https://pulsar.apache.org/docs/io-kafka-sink[OSS {pulsar} Kafka sink connector] are supported. +Exceptions include properties that aren't relevant to {product} and properties that aren't present in {pulsar} {pulsar-version}. \ No newline at end of file diff --git a/modules/connectors/pages/sinks/kinesis.adoc b/modules/connectors/pages/sinks/kinesis.adoc new file mode 100644 index 00000000..0d8def28 --- /dev/null +++ b/modules/connectors/pages/sinks/kinesis.adoc @@ -0,0 +1,39 @@ += Kinesis +:connectorType: kinesis + +The Kinesis sink connector reads messages from {pulsar-short} topics and writes them to https://aws.amazon.com/kinesis/[Amazon Kinesis]. + +[#compatibility] +== Compatibility + +The Amazon Kinesis Client Library for Java (Amazon KCL) is used to consume and process data from Amazon Kinesis in Java applications. + +{product} supports {pulsar-reg} {pulsar-version}, which uses the https://github.com/awslabs/amazon-kinesis-client[Amazon Kinesis Client 2.2.8 library] and the https://github.com/aws/aws-sdk-java[AWS Java SDK 0.14.0 library]. + +== Create the connector + +include::ROOT:partial$sinks/get-started.adoc[] + +== Edit the connector + +include::ROOT:partial$sinks/edit.adoc[] + +== Manage the connector + +include::common:streaming:partial$connectors/manage.adoc[] + +== Connector configuration reference + +include::common:streaming:partial$connectors/connector-params-intro.adoc[] + +=== {pulsar-short} sink connector properties + +include::common:streaming:partial$connectors/sinks/pulsar-config-params.adoc[] + +[#configs] +=== Kinesis sink connector properties (`configs`) + +Set these properties in the `configs` section of the connector configuration. + +Generally, all properties provided in the https://pulsar.apache.org/docs/io-kinesis-sink[OSS {pulsar} Kinesis sink connector] are supported. +Exceptions include properties that aren't relevant to {product} and properties that aren't present in {pulsar} {pulsar-version}. \ No newline at end of file diff --git a/modules/connectors/pages/sources/debezium-mongodb.adoc b/modules/connectors/pages/sources/debezium-mongodb.adoc new file mode 100644 index 00000000..9463b576 --- /dev/null +++ b/modules/connectors/pages/sources/debezium-mongodb.adoc @@ -0,0 +1,42 @@ += Debezium MongoDB +:connectorType: debezium-mongodb + +The Debezium MongoDB source connector tracks either a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections. +It streams those changes as messages to a {pulsar-short} topic. + +The connector automatically handles the following: + +* Addition and removal of shards in a sharded cluster. +* Changes in membership for each replica set. +* Elections within each replica set. +* Resolution of communications issues with replica set members. + +[#compatibility] +== Compatibility + +include::common:streaming:partial$connectors/sources/debezium-compatibility.adoc[] + +== Create the connector + +include::ROOT:partial$sources/get-started.adoc[] + +== Edit the connector + +include::ROOT:partial$sources/edit.adoc[] + +== Manage the connector + +include::common:streaming:partial$connectors/manage.adoc[] + +== Connector configuration reference + +include::common:streaming:partial$connectors/connector-params-intro.adoc[] + +=== {pulsar-short} source connector properties + +include::common:streaming:partial$connectors/sources/pulsar-config-params.adoc[] + +[#configs] +=== Debezium MongoDB source connector properties (`configs`) + +include::common:streaming:partial$connectors/sources/debezium-mongodb-propeties.adoc[] \ No newline at end of file diff --git a/modules/connectors/pages/sources/debezium-mysql.adoc b/modules/connectors/pages/sources/debezium-mysql.adoc new file mode 100644 index 00000000..5c979ab0 --- /dev/null +++ b/modules/connectors/pages/sources/debezium-mysql.adoc @@ -0,0 +1,37 @@ += Debezium MySQL +:connectorType: debezium-mysql + +The Debezium MySQL source connector reads the `binlog` from MySQL database servers, produces change events for row-level `INSERT`, `UPDATE`, and `DELETE` operations, and then sends change event messages to {pulsar-short} topics. + +[#compatibility] +== Compatibility + +include::common:streaming:partial$connectors/sources/debezium-compatibility.adoc[] + +== Create the connector + +include::ROOT:partial$sources/get-started.adoc[] + +== Edit the connector + +include::ROOT:partial$sources/edit.adoc[] + +== Manage the connector + +include::common:streaming:partial$connectors/manage.adoc[] + +== Connector configuration reference + +include::common:streaming:partial$connectors/connector-params-intro.adoc[] + +=== {pulsar-short} source connector properties + +include::common:streaming:partial$connectors/sources/pulsar-config-params.adoc[] + +[#configs] +=== Debezium MySQL source connector properties (`configs`) + +Set these properties in the `configs` section of the connector configuration. + +Generally, all properties provided in the https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-connector-properties[Debezium connector for MySQL] and the https://pulsar.apache.org/docs/io-debezium-source[OSS {pulsar} Debezium source connector] are supported. +Exceptions include properties that aren't relevant to {product} and properties that are only present in <>. \ No newline at end of file diff --git a/modules/connectors/pages/sources/debezium-postgres.adoc b/modules/connectors/pages/sources/debezium-postgres.adoc new file mode 100644 index 00000000..2e398b10 --- /dev/null +++ b/modules/connectors/pages/sources/debezium-postgres.adoc @@ -0,0 +1,37 @@ += Debezium PostgreSQL +:connectorType: debezium-postgres + +The Debezium PostgreSQL source connector produces a change event for every row-level `INSERT`, `UPDATE`, and `DELETE` operation that it captures, and then it sends change event records for each table to separate {pulsar-short} topics. + +[#compatibility] +== Compatibility + +include::common:streaming:partial$connectors/sources/debezium-compatibility.adoc[] + +== Create the connector + +include::ROOT:partial$sources/get-started.adoc[] + +== Edit the connector + +include::ROOT:partial$sources/edit.adoc[] + +== Manage the connector + +include::common:streaming:partial$connectors/manage.adoc[] + +== Connector configuration reference + +include::common:streaming:partial$connectors/connector-params-intro.adoc[] + +=== {pulsar-short} source connector properties + +include::common:streaming:partial$connectors/sources/pulsar-config-params.adoc[] + +[#configs] +=== Debezium PostgreSQL source connector properties (`configs`) + +Set these properties in the `configs` section of the connector configuration. + +Generally, all properties provided in the https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties[Debezium connector for PostgreSQL] and the https://pulsar.apache.org/docs/io-debezium-source[OSS {pulsar} Debezium source connector] are supported. +Exceptions include properties that aren't relevant to {product} and properties that are only present in <>. \ No newline at end of file diff --git a/modules/connectors/pages/sources/kafka.adoc b/modules/connectors/pages/sources/kafka.adoc new file mode 100644 index 00000000..5ac91104 --- /dev/null +++ b/modules/connectors/pages/sources/kafka.adoc @@ -0,0 +1,32 @@ += Kafka +:connectorType: kafka + +The Kafka source connector pulls data from https://kafka.apache.org/[Kafka] topics and persists it to {pulsar-short} topics. + +== Create the connector + +include::ROOT:partial$sources/get-started.adoc[] + +== Edit the connector + +include::ROOT:partial$sources/edit.adoc[] + +== Manage the connector + +include::common:streaming:partial$connectors/manage.adoc[] + +== Connector configuration reference + +include::common:streaming:partial$connectors/connector-params-intro.adoc[] + +=== {pulsar-short} source connector properties + +include::common:streaming:partial$connectors/sources/pulsar-config-params.adoc[] + +[#configs] +=== Kafka source connector properties (`configs`) + +Set these properties in the `configs` section of the connector configuration. + +Generally, all properties provided in the https://pulsar.apache.org/docs/io-kafka-source[OSS {pulsar} Kafka source connector] are supported. +Exceptions include properties that aren't relevant to {product} and properties that aren't present in {pulsar} {pulsar-version}. \ No newline at end of file diff --git a/modules/connectors/pages/sources/kinesis.adoc b/modules/connectors/pages/sources/kinesis.adoc new file mode 100644 index 00000000..5edd895a --- /dev/null +++ b/modules/connectors/pages/sources/kinesis.adoc @@ -0,0 +1,32 @@ += Kinesis +:connectorType: kinesis + +The Kinesis source connector pulls data from https://aws.amazon.com/kinesis/[Amazon Kinesis] and persists it to {pulsar-short} topics. + +== Create the connector + +include::ROOT:partial$sources/get-started.adoc[] + +== Edit the connector + +include::ROOT:partial$sources/edit.adoc[] + +== Manage the connector + +include::common:streaming:partial$connectors/manage.adoc[] + +== Connector configuration reference + +include::common:streaming:partial$connectors/connector-params-intro.adoc[] + +=== {pulsar-short} source connector properties + +include::common:streaming:partial$connectors/sources/pulsar-config-params.adoc[] + +[#configs] +=== Kinesis source connector properties (`configs`) + +Set these properties in the `configs` section of the connector configuration. + +Generally, all properties provided in the https://pulsar.apache.org/docs/io-kinesis-source[OSS {pulsar} Kinesis source connector] are supported. +Exceptions include properties that aren't relevant to {product} and properties that aren't present in {pulsar} {pulsar-version}. \ No newline at end of file diff --git a/modules/operations/pages/io-connectors.adoc b/modules/operations/pages/io-connectors.adoc deleted file mode 100644 index 28c5d142..00000000 --- a/modules/operations/pages/io-connectors.adoc +++ /dev/null @@ -1,105 +0,0 @@ -= Luna Streaming IO connectors - -When you have Luna Streaming xref:install-upgrade:quickstart-server-installs.adoc[installed] and running, add IO connectors to connect your deployment to external systems like https://cassandra.apache.org/_/index.html[Apache Cassandra], https://www.elastic.co/[ElasticSearch], and more. - -* xref:io-connectors.adoc#sink-connectors[Source connectors]: Source connectors read messages from external topics and persist the messages to {pulsar-reg} topics. For more, see https://pulsar.apache.org/docs/en/io-connectors/#source-connector[{pulsar-short} built-in connectors]. - -* xref:io-connectors.adoc#source-connectors[Sink connectors]: Sink connectors read messages from {pulsar-short} topics and persist the messages to external systems. For more, see https://pulsar.apache.org/docs/en/io-connectors/#sink-connector[{pulsar-short} built-in connectors]. - -This doc lists the connectors supported by *Luna Streaming*. - -[#sink-connectors] -== Sink Connectors - -*Sink connectors* read messages from {pulsar-short} topics and persist the messages to external systems. - -The following sink connectors are included in the `` deployment and are supported by {company} Luna Streaming. - -[#datastax-pulsar-sink] -=== {company} enhanced Cassandra sink connector - -To configure, deploy, and use the {company} enhanced Cassandra sink connector, see the xref:pulsar-connector:ROOT:index.adoc[{company} {pulsar} Connector documentation]. - -The {company} enhanced Cassandra sink connector offers the following advantages over the OSS {pulsar-short} Cassandra sink connector: - -* Flexibility in mapping {pulsar-reg} messages to DSE and Cassandra tables. - -* Enterprise grade security support including built-in SSL, and LDAP integration. - -* Consumes all {pulsar-reg} primitives including primitives, JSON and Avro formats. - -* Flexible time/date formatting. - -* Configurable consistency level. - -* Row-level Time-to-Live (TTL). - -* Distributed mode, high availability (HA) support. - -* Standalone mode support for development. - -[#elasticsearch-sink] -=== ElasticSearch sink - -To configure, deploy, and use the ElasticSearch sink connector, see the xref:io-elastic-sink.adoc[Elasticsearch sink connector documentation]. - -[#jdbc-clickhouse-sink] -=== JDBC-Clickhouse sink - -To configure, deploy, and use the JDBC-Clickhouse sink connector, see the https://pulsar.apache.org/docs/next/io-jdbc-sink/[{pulsar-short} documentation]. - -[#jdbc-mariadb-sink] -=== JDBC-MariaDB sink - -To configure, deploy, and use the JDBC-MariaDB sink connector, see the https://pulsar.apache.org/docs/next/io-jdbc-sink#example-for-mariadb[{pulsar-short} documentation]. - -[#jdbc-postgres-sink] -=== JDBC-PostgreSQL sink - -To configure, deploy, and use the JDBC-PostgreSQL connector, see the https://pulsar.apache.org/docs/next/io-jdbc-sink#example-for-postgresql[{pulsar-short} documentation]. - -[#kafka-sink] -=== Kafka sink - -To configure, deploy, and use the Kafka sink connector, see the https://pulsar.apache.org/docs/next/io-kafka-sink#configuration[{pulsar-short} documentation]. - -[#kinesis-sink] -=== Kinesis sink - -To configure, deploy, and use the Kinesis sink connector, see the https://pulsar.apache.org/docs/next/io-kinesis-sink#configuration[{pulsar-short} documentation]. - -[#source-connectors] -== Source Connectors - -*Source connectors* read messages from external topics and persist the messages to {pulsar-short} topics. - -The following sink connectors are included in the `` deployment and are supported by {company} Luna Streaming. - -[#debezium-mongodb-source] -=== Debezium MongoDB source - -To configure, deploy, and use the Debezium MongoDB source connector, see the https://pulsar.apache.org/docs/next/io-debezium-source#mongodb-configuration[{pulsar-short} documentation]. - -[#debezium-mysql-source] -=== Debezium MySQL source - -To configure, deploy, and use the Debezium MySQL source connector, see the https://pulsar.apache.org/docs/next/io-debezium-source#configuration-1[{pulsar-short} documentation]. - -[#debezium-postgres-source] -=== Debezium Postgres source - -To configure, deploy, and use the Debezium PostgreSQL source connector, see the https://pulsar.apache.org/docs/next/io-debezium-source#configuration-2[{pulsar-short} documentation]. - -[#kafka-source] -=== Kafka source - -To configure, deploy, and use the Kafka source connector, see the https://pulsar.apache.org/docs/next/io-kafka-source#configuration[{pulsar-short} documentation]. - -[#kinesis-source] -=== Kinesis source - -To configure, deploy, and use the Kinesis source connector, see the https://pulsar.apache.org/docs/next/io-kinesis-source#configuration[{pulsar-short} documentation]. - -== Next steps - -For more on {pulsar-short} IO connectors, see the https://pulsar.apache.org/docs/en/io-overview/[{pulsar-short} documentation]. \ No newline at end of file diff --git a/modules/operations/pages/io-elastic-sink.adoc b/modules/operations/pages/io-elastic-sink.adoc deleted file mode 100644 index eaf024ac..00000000 --- a/modules/operations/pages/io-elastic-sink.adoc +++ /dev/null @@ -1,353 +0,0 @@ -= Elasticsearch sink connector - -The https://www.elastic.co/elasticsearch/[Elasticsearch] sink connector reads messages from {pulsar-short} topics and persists messages to indexes. - -* xref:io-elastic-sink.adoc#configuration[Configuration] -* xref:io-elastic-sink.adoc#ssl-configuration[ElasticSearchSslConfig properties] -* xref:io-elastic-sink.adoc#example[Example sink connector] -* xref:io-elastic-sink.adoc#elastic-tls[Example Elasticsearch sink with TLS] - -[#configuration] -== Configuration - -The configuration of the Elasticsearch sink connector has the following properties. - -[cols="2,2,1,1,3"] -|=== -|*Name* -|*Type* -|*Required* -|*Default* -|*Description* - -| `apiKey` | String| false | " " (empty string)|The apiKey used by the connector to connect to the ElasticSearch cluster. Only one between basic/token/apiKey authentication mode must be configured. -| `bulkActions` | Integer | false | 1000 | The maximum number of actions per elasticsearch bulk request. Use -1 to disable it. -| `bulkConcurrentRequests` | Integer | false | 0 | The maximum number of in flight elasticsearch bulk requests. The default 0 allows the execution of a single request. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests. -| `bulkEnabled` | Boolean | false | false | Enable the elasticsearch bulk processor to flush write requests based on the number or size of requests, or after a given period. -| `bulkFlushIntervalInMs` | Long | false | 1000 | The maximum period of time to wait for flushing pending writes when bulk writes are enabled. -1 or zero means the scheduled flushing is disabled. -| `bulkSizeInMb` | Integer | false |5 | The maximum size in megabytes of elasticsearch bulk requests. Use -1 to disable it. -| `canonicalKeyFields` | Boolean | false | false | Whether to sort the key fields for JSON and Avro or not. If it is set to `true` and the record key schema is `JSON` or `AVRO`, the serialized object does not consider the order of properties. -| `compatibilityMode` | enum (AUTO,ELASTICSEARCH,ELASTICSEARCH_7,OPENSEARCH) | false | AUTO | Specify compatibility mode with the ElasticSearch cluster. `AUTO` value will try to auto detect the correct compatibility mode to use. Use `ELASTICSEARCH_7` if the target cluster is running ElasticSearch 7 or prior. Use `ELASTICSEARCH` if the target cluster is running ElasticSearch 8 or higher. Use `OPENSEARCH` if the target cluster is running OpenSearch. -| `compressionEnabled` | Boolean | false |false | Enable elasticsearch request compression. -| `connectionIdleTimeoutInMs` | Integer | false |5 | Idle connection timeout to prevent a read timeout. -| `connectionRequestTimeoutInMs` | Integer | false |1000 | The time in milliseconds for getting a connection from the elasticsearch connection pool. -| `connectTimeoutInMs` | Integer | false |5000 | The elasticsearch client connection timeout in milliseconds. -| `createIndexIfNeeded` | Boolean | false | false | Manage index if missing. -| `elasticSearchUrl` | String| true | " " (empty string)| The URL of elastic search cluster to which the connector connects. -| `idHashingAlgorithm` | enum(NONE,SHA256,SHA512)| false | NONE|Hashing algorithm to use for the document id. This is useful in order to be compliant with the ElasticSearch _id hard limit of 512 bytes. -| `indexName` | String| false |" " (empty string)| The index name to which the connector writes messages. The default value is the topic name. It accepts date formats in the name to support event time based index with the pattern `%{+}`. For example, suppose the event time of the record is 1645182000000L, the indexName is `logs-%{+yyyy-MM-dd}`, then the formatted index name would be `logs-2022-02-18`. -| `indexNumberOfReplicas` | int| false |1 | The number of replicas of the index. -| `indexNumberOfShards` | int| false |1| The number of shards of the index. -| `keyIgnore` | Boolean | false |true | Whether to ignore the record key to build the Elasticsearch document `_id`. If primaryFields is defined, the connector extract the primary fields from the payload to build the document `_id` If no primaryFields are provided, elasticsearch auto generates a random document `_id`. -| `malformedDocAction` | enum (IGNORE,WARN,FAIL) | false | FAIL | How to handle elasticsearch rejected documents due to some malformation. Possible options are IGNORE, DELETE or FAIL. Default is FAIL the Elasticsearch document. -| `maxRetries` | Integer | false | 1 | The maximum number of retries for elasticsearch requests. Use -1 to disable it. -| `maxRetryTimeInSec` | Integer| false | 86400 | The maximum retry time interval in seconds for retrying an elasticsearch request. -| `nullValueAction` | enum (IGNORE,DELETE,FAIL) | false | IGNORE | How to handle records with null values, possible options are IGNORE, DELETE or FAIL. Default is IGNORE the message. -| `password` | String| false | " " (empty string)|The password used by the connector to connect to the elastic search cluster.

If `username` is set, then `password` should also be provided. -| `primaryFields` | String | false | "id" | The comma separated ordered list of field names used to build the Elasticsearch document `_id` from the record value. If this list is a singleton, the field is converted as a string. If this list has 2 or more fields, the generated `_id` is a string representation of a JSON array of the field values. -| `retryBackoffInMs` | Integer | false | 100 | The base time to wait when retrying an Elasticsearch request (in milliseconds). -| `schemaEnable` | Boolean | false | false | Turn on the Schema Aware mode. -| `socketTimeoutInMs` | Integer | false |60000 | The socket timeout in milliseconds waiting to read the elasticsearch response. -| `ssl` | ElasticSearchSslConfig | false | string | Configuration for TLS encrypted communication. See xref:io-elastic-sink.adoc#ssl-configuration[]. -| `stripNonPrintableCharacters` | Boolean| false | true| Whether to remove all non-printable characters from the document or not. If it is set to true, all non-printable characters are removed from the document. -| `stripNulls` | Boolean | false |true | If stripNulls is false, elasticsearch _source includes 'null' for empty fields (for example `{"foo": null}`), otherwise null fields are stripped. -| `token` | String| false | " " (empty string)|The token used by the connector to connect to the ElasticSearch cluster. Only one between basic/token/apiKey authentication mode must be configured. -| `typeName` | String | false | "_doc" | The type name to which the connector writes messages to. The value should be set explicitly to a valid type name other than "_doc" for Elasticsearch version before 6.2, and left to default otherwise. -| `username` | String| false |" " (empty string)| The username used by the connector to connect to the elastic search cluster. If `username` is set, then `password` should also be provided. -|=== - -[#ssl-configuration] -=== ElasticSearchSslConfig structure - -[cols="2,1,1,1,3"] -|=== -|*Name* -|*Type* -|*Required* -|*Default* -|*Description* - -| `cipherSuites` | String| false |" " (empty string)| SSL/TLS cipher suites. -| `disableCertificateValidation` | Boolean| false | true | Whether or not to disable the node certificate validation. Changing this value is highly insecure and you should not use it in production environment. -| `enabled` | Boolean| false | false | Enable SSL/TLS. -| `hostnameVerification` | Boolean| false | true | Whether or not to validate node hostnames when using SSL. -| `keystorePassword` | String| false |" " (empty string)| Keystore password. -| `keystorePath` | String| false |" " (empty string)| The path to the keystore file. -| `protocols` | String| false |"TLSv1.2" | Comma separated list of enabled SSL/TLS protocols. -| `truststorePassword` | String| false |" " (empty string)| Truststore password. -| `truststorePath` | String| false |" " (empty string)| The path to the truststore file. -|=== - -[#example] -== Example - -=== Requirements - -To deploy an Elasticsearch sink connector, the following are required: - -- Elasticsearch 7 (Elasticsearch 8 will be supported in the future) -- OpenSearch 1.x - -=== Usage - -. Create a JSON or YAML configuration file. -+ -[tabs] -==== -JSON:: -+ --- -[source,json] ----- -{ - "configs": { - "elasticSearchUrl": "http://localhost:9200", - "indexName": "my_index", - "username": "username", - "password": "password" - } -} ----- --- -+ -YAML:: -+ --- -[source,yaml] ----- -configs: - elasticSearchUrl: "http://localhost:9200" - indexName: "my_index" - username: "username" - password: "password" ----- --- -==== -+ -[NOTE] -==== -For Elasticsearch versions before *6.2*, the value of `typeName` is required, and should be set explicitly to a valid type name *other than* "_doc". -==== - -. Start a single node Elasticsearch cluster. -+ -[source,bash] ----- -$ docker run -p 9200:9200 -p 9300:9300 \ - -e "discovery.type=single-node" \ - docker.elastic.co/elasticsearch/elasticsearch:7.13.3 ----- - -. Start a {pulsar-short} service locally in standalone mode. -+ -[source,bash] ----- -$ bin/pulsar standalone ----- -+ -. Make sure the connector NAR file is available at `connectors/pulsar-io-elastic-search-@pulsar:version@.nar`. -+ -. Start the {pulsar-short} Elasticsearch connector in local run mode using the JSON or YAML configuration file. -+ -[tabs] -==== -JSON:: -+ --- -[source,bash] ----- -$ bin/pulsar-admin sinks localrun \ - --archive connectors/pulsar-io-elastic-search-@pulsar:version@.nar \ - --tenant public \ - --namespace default \ - --name elasticsearch-test-sink \ - --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index","username": "username","password": "password"}' \ - --inputs elasticsearch_test ----- --- -+ -YAML:: -+ --- -[source,bash] ----- -$ bin/pulsar-admin sinks localrun \ - --archive connectors/pulsar-io-elastic-search-@pulsar:version@.nar \ - --tenant public \ - --namespace default \ - --name elasticsearch-test-sink \ - --sink-config-file elasticsearch-sink.yml \ - --inputs elasticsearch_test ----- --- -==== - -. Publish records to the topic. -+ -[source,bash] ----- -$ bin/pulsar-client produce elasticsearch_test --messages "{\"a\":1}" ----- - -. Check documents in Elasticsearch. - -.. Refresh the index. -+ -[source,bash] ----- -$ curl -s http://localhost:9200/my_index/_refresh ----- - -.. Search documents. -+ -[source,bash] ----- -$ curl -s http://localhost:9200/my_index/_search ----- - -.. You can see the record that published earlier has been successfully written into Elasticsearch. -+ -[source,json] ----- -{"took":2,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"my_index","_type":"_doc","_id":"FSxemm8BLjG_iC0EeTYJ","_score":1.0,"_source":{"a":1}}]}} ----- - -[#elastic-tls] -== Example with TLS enabled - -Enable Transport Layer Security (TLS) on your Elasticsearch cluster to encrypt network traffic from eavesdropping. - -. Create a JSON or YAML configuration file with TLS/SSL enabled. -+ -[tabs] -==== -JSON:: -+ --- -[source,json] ----- -{ - "configs": { - "elasticSearchUrl": "http://localhost:9200", - "indexName": "my_index", - "username": "username", - "password": "password", - "ssl": { - "enabled": true - "truststorePath": "/pulsar/security/truststore.jks" - "truststorePassword": "truststorepass" - "keystorePath": "/pulsar/security/keystore.jks" - "keystorePassword": "keystorepass" - } - } -} ----- --- -+ -YAML:: -+ --- -[source,yaml] ----- -configs: - elasticSearchUrl: "http://localhost:9200" - indexName: "my_index" - username: "username" - password: "password" - ssl: - enabled: true - truststorePath: "/pulsar/security/truststore.jks" - truststorePassword: "truststorepass" - keystorePath: ""/pulsar/security/keystore.jks"" - keystorePassword: "keystorepass" ----- --- -==== - -. Start a single node Elasticsearch cluster. -+ -[source,bash] ----- -$ docker run -p 9200:9200 -p 9300:9300 \ - -e "discovery.type=single-node" \ - docker.elastic.co/elasticsearch/elasticsearch:7.13.3 ----- - -. Start a {pulsar-short} service locally in standalone mode. -+ -[source,bash] ----- -$ bin/pulsar standalone ----- -+ -. Make sure the connector NAR file is available at `connectors/pulsar-io-elastic-search-@pulsar:version@.nar`. -+ -. Start the {pulsar-short} Elasticsearch connector in local run mode using the JSON or YAML configuration file. -+ -[tabs] -==== -JSON:: -+ --- -[source,bash] ----- -$ bin/pulsar-admin sinks localrun \ - --archive connectors/pulsar-io-elastic-search-@pulsar:version@.nar \ - --tenant public \ - --namespace default \ - --name elasticsearch-test-sink \ - --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index","username": "username","password": "password",ssl: {"enabled": true,"truststorePath": "/pulsar/security/truststore.jks","truststorePassword": "truststorepass","keystorePath": "/pulsar/security/keystore.jks","keystorePassword": "keystorepass"}}' \ - --inputs elasticsearch_test ----- --- -+ -YAML:: -+ --- -[source,bash] ----- -$ bin/pulsar-admin sinks localrun \ - --archive connectors/pulsar-io-elastic-search-@pulsar:version@.nar \ - --tenant public \ - --namespace default \ - --name elasticsearch-test-sink \ - --sink-config-file elasticsearch-sink.yml \ - --inputs elasticsearch_test ----- --- -==== - -. Publish records to the topic. -+ -[source,bash] ----- -$ bin/pulsar-client produce elasticsearch_test --messages "{\"a\":1}" ----- - -. Check documents in Elasticsearch. - -.. Refresh the index. -+ -[source,bash] ----- -$ curl -s http://localhost:9200/my_index/_refresh ----- - -.. Search documents. -+ -[source,bash] ----- -$ curl -s http://localhost:9200/my_index/_search ----- - -.. You can see the record that published earlier has been successfully written into Elasticsearch. -+ -[source,json] ----- -{"took":2,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"my_index","_type":"_doc","_id":"FSxemm8BLjG_iC0EeTYJ","_score":1.0,"_source":{"a":1}}]}} ----- - -== Next steps - -For more sink connectors, see xref:io-connectors.adoc#sink-connectors[Luna Streaming sink connectors]. -For more source connectors, see xref:io-connectors.adoc#source-connectors[Luna Streaming source connectors]. \ No newline at end of file