Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ USER appuser
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.7.6 && \
confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.6.5

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-gcp-pubsub:1.2.5 && \
confluent-hub install --no-prompt jcustenborder/kafka-connect-json-schema:0.2.5

USER root
RUN yum install jq -y

Expand All @@ -26,6 +29,8 @@ USER appuser

COPY connect-scripts /connect-scripts
COPY ./connect-smt-lib/build/libs/connect-smt-lib-*.jar /usr/share/java/kafka
COPY ./credentials /credentials
COPY schemas/gcp-pubsub-FromJson-schema.json /schemas/gcp-pubsub-FromJson-schema.json
COPY connect-connector-configs /connect-connector-configs

ENTRYPOINT ["sh","/connect-scripts/connect-entrypoint.sh"]
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
* [Container logs](#container-logs)
* [Running Integration tests once](#running-integration-tests-once)
* [JaCoCo coverage](#jacoco-coverage)
* [Manual Tests](#manual-tests)
* [GCP Pub Sub Source Connector](#gcp-pub-sub-source-connector)
* [TODOs](#todos)
<!-- TOC -->

Expand Down Expand Up @@ -308,6 +310,35 @@ directory.

---

## Manual Tests

### GCP Pub Sub Source Connector

1. Create the Kafka Topic specified in
the [connector configuration](connect-connector-configs/gcp-pubsub-source-connector.json)
2. Apply the [JSON schema](schemas/gcp-pubsub-kafka-topic-schema.json) to the Topic value subject
3. Log in to the [GCP console](https://console.cloud.google.com)
4. Add the private key credentials file contents from the GCP console for a Service Account that has the
`Pub/Sub Subscriber` role - https://console.cloud.google.com/iam-admin/serviceaccounts
5. Create the GCP topic and subscription specified in
the [connector configuration](connect-connector-configs/gcp-pubsub-source-connector.json)
6. Run this command from a local terminal to produce a message on the GCP topic:
- `gcloud pubsub topics publish kafka-connect-topic --message="{\"OrderNumber\":\"8caebe13-3f79-4861-88df-a7953424381b\",\"SiteId\":\"SITE123456\",\"Locale\":\"en-US\",\"CreatedTime\":\"2024-06-19T11:49:46.936983\",\"AddressId\":123,\"OrderValue\":999.99}"`
- the expanded payload looks like this:
```json
{
"OrderNumber": "8caebe13-3f79-4861-88df-a7953424381b",
"SiteId": "SITE123456",
"Locale": "en-US",
"CreatedTime": "2024-06-19T11:49:46.936983",
"AddressId": 123,
"OrderValue": 999.99
}
```
7. Check the Confluent Control Center UI for a record being produced to the Kafka Topic

---

## TODOs

Some outstanding tasks to make the project more complete can be found [here](todo/README.md)
35 changes: 35 additions & 0 deletions connect-connector-configs/gcp-pubsub-source-connector.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"name": "gcp-pubsub-source-connector",
"config": {
"connector.class": "io.confluent.connect.gcp.pubsub.PubSubSourceConnector",
"tasks.max": "1",
"confluent.topic.bootstrap.servers": "broker:29092",
"gcp.pubsub.project.id": "be-mygwtproject",
"gcp.pubsub.topic.id": "kafka-connect-topic",
"gcp.pubsub.subscription.id": "kafka-connect-topic-sub",
"gcp.pubsub.credentials.path": "/credentials/gcp-pubsub-credentials.json",
"gcp.pubsub.data.format": "utf_8",
"kafka.topic": "pub-sub-topic",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "false",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.auto.register.schemas": "false",
"value.converter.use.latest.version": "true",
"value.converter.latest.compatibility.strict": "false",
"value.converter.json.fail.invalid.schema": "true",
"transforms": "extractValue,fromJson,valueToKey,extractKey",
"transforms.extractValue.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractValue.field": "MessageData",
"transforms.fromJson.type": "com.github.jcustenborder.kafka.connect.json.FromJson$Value",
"transforms.fromJson.json.schema.location": "Url",
"transforms.fromJson.json.schema.url": "file:///schemas/gcp-pubsub-FromJson-schema.json",
"transforms.valueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.valueToKey.fields": "OrderNumber",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "OrderNumber",
"errors.tolerance": "all",
"errors.log.include.messages": "true",
"errors.log.enable": "true"
}
}
13 changes: 13 additions & 0 deletions credentials/gcp-pubsub-credentials.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"type": "service_account",
"project_id": "",
"private_key_id": "",
"private_key": "",
"client_email": "",
"client_id": "",
"auth_uri": "",
"token_uri": "",
"auth_provider_x509_cert_url": "",
"client_x509_cert_url": "",
"universe_domain": ""
}
25 changes: 25 additions & 0 deletions schemas/gcp-pubsub-FromJson-schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "PubSubData",
"type": "object",
"properties": {
"OrderNumber": {
"type": "string"
},
"SiteId": {
"type": "string"
},
"Locale": {
"type": "string"
},
"CreatedTime": {
"type": "string"
},
"AddressId": {
"type": "integer"
},
"OrderValue": {
"type": "number"
}
}
}
32 changes: 32 additions & 0 deletions schemas/gcp-pubsub-kafka-topic-schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "JSON Data sourced from GCP Pub Sub",
"title": "pub-sub-data",
"type": "object",
"properties": {
"AddressId": {
"type": "integer"
},
"CreatedTime": {
"maxLength": 26,
"type": "string"
},
"Locale": {
"type": "string",
"minLength": 5,
"maxLength": 5
},
"OrderNumber": {
"maxLength": 36,
"type": "string"
},
"OrderValue": {
"type": "number",
"minimum": 10
},
"SiteId": {
"maxLength": 10,
"type": "string"
}
}
}