Add GCP Pub Sub Source Connector, FromJSON SMT#37
Open
Conversation
06eb21c to
a26a381
Compare
The Google Cloud Pub/Sub Source Connector can be used to ingest data from a topic/subscription. The connector knows which GCP subscription to listen for messages on using the following configuration properties: - `gcp.pubsub.project.id` - `gcp.pubsub.topic.id` - `gcp.pubsub.subscription.id` The GCP private key credentials are loaded from the file located on the Docker container file system and can be downloaded from the GCP console: - `gcp.pubsub.credentials.path` Alternatively, the JSON file contents can be inlined: - `gcp.pubsub.credentials.json` The full list of connector configuration properties can be found at - https://docs.confluent.io/kafka-connectors/gcp-pubsub/current/configuration_options.html#pubsub-source-connector-config --- The scenario covered in this connector expects a JSON string to be published to the GCP Topic. In order to convert that JSON object to a Kafka Connect Struct the `FromJSON` SMT is being used: - `com.github.jcustenborder.kafka.connect.json.FromJson$Value` The `FromJson` SMT requires a JSON schema to enable it to map the JSON properties to the Struct fields. This is done by referencing a json file in the Docker container file system: - `"transforms.fromJson.json.schema.location": "Url"` - `"transforms.fromJson.json.schema.url": "file:///schemas/FromJson-pub-sub-schema.json"` This JSON schema can also be inlined: - `"transforms.fromJson.json.schema.location": "Inline"` - `json.schema.inline` The payload from GCP is stored in the `MessageData` field and extracted. --- The target Kafka Topic has a JSON schema applied to the value subject to ensure only valid data is produced to the topic, we use this converter to perform the JSON schema validation: - `"value.converter": "io.confluent.connect.json.JsonSchemaConverter"` If the JSON object in the value of the record does not conform to the JSON schema, the record will be rejected and an error with the JSON validation details will be logged. --- We use ValueToKey/ExtractField$Key SMTs to extract the OrderNumber and use it as the Kafka record key: - https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html See more at: - https://www.confluent.io/hub/confluentinc/kafka-connect-gcp-pubsub - https://github.com/jcustenborder/kafka-connect-json-schema - https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-json-schema/transformations/FromJson.html
a26a381 to
f6f51e3
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The Google Cloud Pub/Sub Source Connector can be used to ingest data from a topic/subscription. The connector knows which GCP subscription to listen for messages on using the following configuration properties:
gcp.pubsub.project.idgcp.pubsub.topic.idgcp.pubsub.subscription.idThe GCP private key credentials are loaded from the file located on the Docker container file system and can be downloaded from the GCP console:
gcp.pubsub.credentials.pathAlternatively, the JSON file contents can be inlined:
gcp.pubsub.credentials.jsonThe full list of connector configuration properties can be found at
The scenario covered in this connector expects a JSON string to be published to the GCP Topic. In order to convert that JSON object to a Kafka Connect Struct the
FromJSONSMT is being used:com.github.jcustenborder.kafka.connect.json.FromJson$ValueThe
FromJsonSMT requires a JSON schema to enable it to map the JSON properties to the Struct fields. This is done by referencing a json file in the Docker container file system:"transforms.fromJson.json.schema.location": "Url""transforms.fromJson.json.schema.url": "file:///schemas/FromJson-pub-sub-schema.json"This JSON schema can also be inlined:
"transforms.fromJson.json.schema.location": "Inline"json.schema.inlineThe payload from GCP is stored in the
MessageDatafield and extracted.The target Kafka Topic has a JSON schema applied to the value subject to ensure only valid data is produced to the topic, we use this converter to perform the JSON schema validation:
"value.converter": "io.confluent.connect.json.JsonSchemaConverter"If the JSON object in the value of the record does not conform to the JSON schema, the record will be rejected and an error with the JSON validation details will be logged.
We use ValueToKey/ExtractField$Key SMTs to extract the OrderNumber and use it as the Kafka record key:
See more at: